You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2022/05/06 18:31:26 UTC

[kafka] branch trunk updated: KAFKA-10888: Sticky partition leads to uneven produce msg (#12049)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7db6031b8 KAFKA-10888: Sticky partition leads to uneven produce msg (#12049)
f7db6031b8 is described below

commit f7db6031b84a136ad0e257df722b20faa7c37b8a
Author: Artem Livshits <84...@users.noreply.github.com>
AuthorDate: Fri May 6 11:31:12 2022 -0700

    KAFKA-10888: Sticky partition leads to uneven produce msg (#12049)
    
    The design is described in detail in KIP-794
    https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner.
    
    Implementation notes:
    
    The default partitioning logic is moved to the BuiltInPartitioner class
    (there is one object per topic).  The object keeps track of how many
    bytes are produced per-partition and once the amount exceeds batch.size,
    switches to the next partition (note that partition switch decision is
    decoupled from batching).  The object also keeps track of probability
    weights that are based on the queue sizes (the larger the queue size
    is the less chance for the next partition to be chosen).  The queue
    sizes are calculated in the RecordAccumulator in the `ready` method,
    the method already enumerates all partitions so we just add some extra
    logic into the existing O(N) method.  The partition switch decision may
    take O(logN), where N is the number partitions per topic, but it happens
    only once per batch.size (and the logic is avoided when all queues are
    of equal size).  Produce bytes accounting logic is lock-free.
    
    When partitioner.availability.timeout.ms is non-0, RecordAccumulator
    keeps stats on "node latency" which is defined as the difference between
    the last time the node had a batch waiting to be send and the last time
    the node was ready to take a new batch.  If this difference exceeds
    partitioner.availability.timeout.ms we don't switch to that partition
    until the node is ready.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      | 174 ++++--
 .../kafka/clients/producer/MockProducer.java       |  17 +-
 .../apache/kafka/clients/producer/Partitioner.java |   6 +-
 .../kafka/clients/producer/ProducerConfig.java     |  35 +-
 .../clients/producer/UniformStickyPartitioner.java |   6 +
 .../clients/producer/internals/BufferPool.java     |   3 +-
 .../producer/internals/BuiltInPartitioner.java     | 297 +++++++++++
 .../producer/internals/DefaultPartitioner.java     |   5 +
 .../producer/internals/RecordAccumulator.java      | 591 ++++++++++++++++-----
 .../kafka/clients/producer/internals/Sender.java   |   8 +
 .../java/org/apache/kafka/common/utils/Utils.java  |   2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 100 ++--
 .../kafka/clients/producer/MockProducerTest.java   |   3 +-
 .../producer/UniformStickyPartitionerTest.java     |   5 +-
 .../producer/internals/BuiltInPartitionerTest.java | 198 +++++++
 .../producer/internals/DefaultPartitionerTest.java |   1 +
 .../producer/internals/RecordAccumulatorTest.java  | 289 +++++++---
 .../clients/producer/internals/SenderTest.java     | 130 ++++-
 .../producer/internals/TransactionManagerTest.java |  30 +-
 .../org/apache/kafka/streams/kstream/KStream.java  |   1 -
 .../org/apache/kafka/streams/kstream/Produced.java |   1 -
 .../kafka/streams/kstream/Repartitioned.java       |   1 -
 .../kafka/streams/processor/StreamPartitioner.java |   3 +-
 .../internals/DefaultStreamPartitioner.java        |   9 +-
 .../internals/WindowedStreamPartitionerTest.java   |   4 +-
 .../processor/internals/RecordCollectorTest.java   |   9 +-
 .../org/apache/kafka/test/MockClientSupplier.java  |   3 +-
 27 files changed, 1571 insertions(+), 360 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 85a3e239e9..9c6a728899 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -254,6 +254,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
+    private final boolean partitionerIgnoreKeys;
     private final ProducerInterceptors<K, V> interceptors;
     private final ApiVersions apiVersions;
     private final TransactionManager transactionManager;
@@ -316,6 +317,23 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         this(Utils.propsToMap(properties), keySerializer, valueSerializer);
     }
 
+    /**
+     * Check if partitioner is deprecated and log a warning if it is.
+     */
+    @SuppressWarnings("deprecation")
+    private void warnIfPartitionerDeprecated() {
+        // Using DefaultPartitioner and UniformStickyPartitioner is deprecated, see KIP-794.
+        if (partitioner instanceof org.apache.kafka.clients.producer.internals.DefaultPartitioner) {
+            log.warn("DefaultPartitioner is deprecated.  Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG
+                    + " configuration setting to get the default partitioning behavior");
+        }
+        if (partitioner instanceof org.apache.kafka.clients.producer.UniformStickyPartitioner) {
+            log.warn("UniformStickyPartitioner is deprecated.  Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG
+                    + " configuration setting and set " + ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG
+                    + " to 'true' to get the uniform sticky partitioning behavior");
+        }
+    }
+
     // visible for testing
     @SuppressWarnings("unchecked")
     KafkaProducer(ProducerConfig config,
@@ -360,6 +378,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     ProducerConfig.PARTITIONER_CLASS_CONFIG,
                     Partitioner.class,
                     Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
+            warnIfPartitionerDeprecated();
+            this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
                 this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
@@ -397,12 +417,20 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
             this.apiVersions = new ApiVersions();
             this.transactionManager = configureTransactionState(config, logContext);
+            // There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
+            boolean enableAdaptivePartitioning = partitioner == null &&
+                config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
+            RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
+                enableAdaptivePartitioning,
+                config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
+            );
             this.accumulator = new RecordAccumulator(logContext,
                     config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                     this.compressionType,
                     lingerMs(config),
                     retryBackoffMs,
                     deliveryTimeoutMs,
+                    partitionerConfig,
                     metrics,
                     PRODUCER_METRIC_GROUP_NAME,
                     time,
@@ -468,6 +496,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
         this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
         this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+        this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
         this.apiVersions = new ApiVersions();
         this.transactionManager = transactionManager;
         this.accumulator = accumulator;
@@ -922,11 +951,24 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             throw new IllegalStateException("Cannot perform operation after producer has been closed");
     }
 
+    /**
+     * Call deprecated {@link Partitioner#onNewBatch}
+     */
+    @SuppressWarnings("deprecation")
+    private void onNewBatch(String topic, Cluster cluster, int prevPartition) {
+        assert partitioner != null;
+        partitioner.onNewBatch(topic, cluster, prevPartition);
+    }
+
     /**
      * Implementation of asynchronously send a record to a topic.
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
-        TopicPartition tp = null;
+        // Append callback takes care of the following:
+        //  - call interceptors and user callback on completion
+        //  - remember partition that is calculated in RecordAccumulator.append
+        AppendCallbacks<K, V> appendCallbacks = new AppendCallbacks<K, V>(callback, this.interceptors, record);
+
         try {
             throwIfProducerClosed();
             // first make sure the metadata for the topic is available
@@ -958,8 +1000,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in value.serializer", cce);
             }
+
+            // Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,
+            // which means that the RecordAccumulator would pick a partition using built-in logic (which may
+            // take into account broker load, the amount of data produced to each partition, etc.).
             int partition = partition(record, serializedKey, serializedValue, cluster);
-            tp = new TopicPartition(record.topic(), partition);
 
             setReadOnly(record.headers());
             Header[] headers = record.headers().toArray();
@@ -968,41 +1013,38 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     compressionType, serializedKey, serializedValue, headers);
             ensureValidRecordSize(serializedSize);
             long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
-            if (log.isTraceEnabled()) {
-                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            }
-            // producer callback will make sure to call both 'callback' and interceptor callback
-            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
-                    serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
+            // A custom partitioner may take advantage on the onNewBatch callback.
+            boolean abortOnNewBatch = partitioner != null;
+
+            // Append the record to the accumulator.  Note, that the actual partition may be
+            // calculated there and can be accessed via appendCallbacks.topicPartition.
+            RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
+                    serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
+            assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;
 
             if (result.abortForNewBatch) {
                 int prevPartition = partition;
-                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
+                onNewBatch(record.topic(), cluster, prevPartition);
                 partition = partition(record, serializedKey, serializedValue, cluster);
-                tp = new TopicPartition(record.topic(), partition);
                 if (log.isTraceEnabled()) {
                     log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                 }
-                // producer callback will make sure to call both 'callback' and interceptor callback
-                interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
-
-                result = accumulator.append(tp, timestamp, serializedKey,
-                    serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
+                result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
+                    serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
             }
 
             // Add the partition to the transaction (if in progress) after it has been successfully
-            // appended to the accumulator. We cannot do it before because the initially selected
-            // partition may be changed when the batch is closed (as indicated by `abortForNewBatch`).
-            // Note that the `Sender` will refuse to dequeue batches from the accumulator until they
-            // have been added to the transaction.
+            // appended to the accumulator. We cannot do it before because the partition may be
+            // unknown or the initially selected partition may be changed when the batch is closed
+            // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse to dequeue
+            // batches from the accumulator until they have been added to the transaction.
             if (transactionManager != null) {
-                transactionManager.maybeAddPartition(tp);
+                transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
             }
 
             if (result.batchIsFull || result.newBatchCreated) {
-                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
+                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
                 this.sender.wakeup();
             }
             return result.future;
@@ -1011,33 +1053,28 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             // for other exceptions throw directly
         } catch (ApiException e) {
             log.debug("Exception occurred during message send:", e);
-            // producer callback will make sure to call both 'callback' and interceptor callback
-            if (tp == null) {
-                // set topicPartition to -1 when null
-                tp = ProducerInterceptors.extractTopicPartition(record);
-            }
-
             if (callback != null) {
+                TopicPartition tp = appendCallbacks.topicPartition();
                 RecordMetadata nullMetadata = new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
                 callback.onCompletion(nullMetadata, e);
             }
             this.errors.record();
-            this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
             if (transactionManager != null) {
                 transactionManager.maybeTransitionToErrorState(e);
             }
             return new FutureFailure(e);
         } catch (InterruptedException e) {
             this.errors.record();
-            this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
             throw new InterruptException(e);
         } catch (KafkaException e) {
             this.errors.record();
-            this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
             throw e;
         } catch (Exception e) {
             // we notify interceptor about all exceptions, since onSend is called before anything else in this method
-            this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
             throw e;
         }
     }
@@ -1321,21 +1358,33 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     /**
      * computes partition for given record.
      * if the record has partition returns the value otherwise
-     * calls configured partitioner class to compute the partition.
+     * if custom partitioner is specified, call it to compute partition
+     * otherwise try to calculate partition based on key.
+     * If there is no key or key should be ignored return
+     * RecordMetadata.UNKNOWN_PARTITION to indicate any partition
+     * can be used (the partition is then calculated by built-in
+     * partitioning logic).
      */
     private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
-        Integer partition = record.partition();
-        if (partition != null) {
-            return partition;
-        }
+        if (record.partition() != null)
+            return record.partition();
 
-        int customPartition = partitioner.partition(
+        if (partitioner != null) {
+            int customPartition = partitioner.partition(
                 record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
-        if (customPartition < 0) {
-            throw new IllegalArgumentException(String.format(
+            if (customPartition < 0) {
+                throw new IllegalArgumentException(String.format(
                     "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
+            }
+            return customPartition;
+        }
+
+        if (serializedKey != null && !partitionerIgnoreKeys) {
+            // hash the keyBytes to choose a partition
+            return Utils.toPositive(Utils.murmur2(serializedKey)) % cluster.partitionsForTopic(record.topic()).size();
+        } else {
+            return RecordMetadata.UNKNOWN_PARTITION;
         }
-        return customPartition;
     }
 
     private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
@@ -1403,25 +1452,54 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     }
 
     /**
-     * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and
-     * notifies producer interceptors about the request completion.
+     * Callbacks that are called by the RecordAccumulator append functions:
+     *  - user callback
+     *  - interceptor callbacks
+     *  - partition callback
      */
-    private static class InterceptorCallback<K, V> implements Callback {
+    private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final TopicPartition tp;
+        private final ProducerRecord<K, V> record;
+        protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
-        private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
+        private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
-            this.tp = tp;
+            this.record = record;
         }
 
+        @Override
         public void onCompletion(RecordMetadata metadata, Exception exception) {
-            metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+            if (metadata == null) {
+                metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+            }
             this.interceptors.onAcknowledgement(metadata, exception);
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
         }
+
+        @Override
+        public void setPartition(int partition) {
+            assert partition != RecordMetadata.UNKNOWN_PARTITION;
+            this.partition = partition;
+
+            if (log.isTraceEnabled()) {
+                // Log the message here, because we don't know the partition before that.
+                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
+            }
+        }
+
+        public int getPartition() {
+            return partition;
+        }
+
+        public TopicPartition topicPartition() {
+            if (record == null)
+                return null;
+            return partition == RecordMetadata.UNKNOWN_PARTITION
+                    ? ProducerInterceptors.extractTopicPartition(record)
+                    : new TopicPartition(record.topic(), partition);
+        }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 4fd540dcea..3df73b20a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.Cluster;
@@ -117,10 +116,24 @@ public class MockProducer<K, V> implements Producer<K, V> {
      *
      * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
      */
+    @SuppressWarnings("deprecation")
     public MockProducer(final boolean autoComplete,
                         final Serializer<K> keySerializer,
                         final Serializer<V> valueSerializer) {
-        this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
+        this(Cluster.empty(), autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);
+    }
+
+    /**
+     * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
+     *
+     * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(cluster, autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
+     */
+    @SuppressWarnings("deprecation")
+    public MockProducer(final Cluster cluster,
+                        final boolean autoComplete,
+                        final Serializer<K> keySerializer,
+                        final Serializer<V> valueSerializer) {
+        this(cluster, autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
index 13eaa5aaea..eeafc73d66 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -44,12 +44,16 @@ public interface Partitioner extends Configurable, Closeable {
     void close();
 
     /**
+     * Note this method is only implemented in DefatultPartitioner and UniformStickyPartitioner which
+     * are now deprecated.  See KIP-794 for more info.
+     *
      * Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
-     * this method can change the chosen sticky partition for the new batch. 
+     * this method can change the chosen sticky partition for the new batch.
      * @param topic The topic name
      * @param cluster The current cluster metadata
      * @param prevPartition The partition previously selected for the record that triggered a new batch
      */
+    @Deprecated
     default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 2d586f255c..25c718a9f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -93,6 +92,26 @@ public class ProducerConfig extends AbstractConfig {
                                                  + "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated "
                                                  + "batch size is under this <code>batch.size</code> setting.";
 
+    /** <code>partitioner.adaptive.partitioning.enable</code> */
+    public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable";
+    private static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC =
+            "When set to 'true', the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers. "
+            + "If 'false', producer will try to distribute messages uniformly. Note: this setting has no effect if a custom partitioner is used";
+
+    /** <code>partitioner.availability.timeout.ms</code> */
+    public static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG = "partitioner.availability.timeout.ms";
+    private static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC =
+            "If a broker cannot process produce requests from a partition for <code>" + PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG + "</code> time, "
+            + "the partitioner treats that partition as not available.  If the value is 0, this logic is disabled. "
+            + "Note: this setting has no effect if a custom partitioner is used or <code>" + PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG
+            + "<code/> is set to 'false'";
+
+    /** <code>partitioner.ignore.keys</code> */
+    public static final String PARTITIONER_IGNORE_KEYS_CONFIG = "partitioner.ignore.keys";
+    private static final String PARTITIONER_IGNORE_KEYS_DOC = "When set to 'true' the producer won't use record keys to choose a partition. "
+            + "If 'false', producer would choose a partition based on a hash of the key when a key is present. "
+            + "Note: this setting has no effect if a custom partitioner is used.";
+
     /** <code>acks</code> */
     public static final String ACKS_CONFIG = "acks";
     private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
@@ -259,11 +278,11 @@ public class ProducerConfig extends AbstractConfig {
     public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
     private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +
         "<ul>" +
-            "<li><code>org.apache.kafka.clients.producer.internals.DefaultPartitioner</code>: The default partitioner. " +
-        "This strategy will try sticking to a partition until the batch is full, or <code>linger.ms</code> is up. It works with the strategy:" +
+            "<li>If not set, the default partitioning logic is used. " +
+        "This strategy will try sticking to a partition until " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +
                 "<ul>" +
                     "<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +
-                    "<li>If no partition or key is present, choose the sticky partition that changes when the batch is full, or <code>linger.ms</code> is up.</li>" +
+                    "<li>If no partition or key is present, choose the sticky partition that changes when " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +
                 "</ul>" +
             "</li>" +
             "<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +
@@ -271,9 +290,6 @@ public class ProducerConfig extends AbstractConfig {
         "until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +
         "Please check KAFKA-9965 for more detail." +
             "</li>" +
-            "<li><code>org.apache.kafka.clients.producer.UniformStickyPartitioner</code>: This partitioning strategy will " +
-        "try sticking to a partition(no matter if the 'key' is provided or not) until the batch is full, or <code>linger.ms</code> is up." +
-            "</li>" +
         "</ul>" +
         "<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";
 
@@ -334,6 +350,9 @@ public class ProducerConfig extends AbstractConfig {
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
+                                .define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
+                                .define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
+                                .define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC)
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
@@ -418,7 +437,7 @@ public class ProducerConfig extends AbstractConfig {
                                         CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                                 .define(PARTITIONER_CLASS_CONFIG,
                                         Type.CLASS,
-                                        DefaultPartitioner.class,
+                                        null,
                                         Importance.MEDIUM, PARTITIONER_CLASS_DOC)
                                 .define(INTERCEPTOR_CLASSES_CONFIG,
                                         Type.LIST,
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/UniformStickyPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/UniformStickyPartitioner.java
index be11d0b662..6e4fe420df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/UniformStickyPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/UniformStickyPartitioner.java
@@ -23,6 +23,10 @@ import org.apache.kafka.common.Cluster;
 
 
 /**
+ * NOTE this partitioner is deprecated and shouldn't be used.  To use default partitioning logic
+ * remove partitioner.class configuration setting and set partitioner.ignore.keys=true.
+ * See KIP-794 for more info.
+ *
  * The partitioning strategy:
  * <ul>
  * <li>If a partition is specified in the record, use it
@@ -33,6 +37,7 @@ import org.apache.kafka.common.Cluster;
  * 
  * See KIP-480 for details about sticky partitioning.
  */
+@Deprecated
 public class UniformStickyPartitioner implements Partitioner {
 
     private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
@@ -59,6 +64,7 @@ public class UniformStickyPartitioner implements Partitioner {
      * If a batch completed for the current sticky partition, change the sticky partition. 
      * Alternately, if no sticky partition has been determined, set one.
      */
+    @SuppressWarnings("deprecation")
     public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
         stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 210911ada3..67cf485f81 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -279,7 +279,8 @@ public class BufferPool {
     }
 
     public void deallocate(ByteBuffer buffer) {
-        deallocate(buffer, buffer.capacity());
+        if (buffer != null)
+            deallocate(buffer, buffer.capacity());
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
new file mode 100644
index 0000000000..1c2d10f3f6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per topic.
+ */
+public class BuiltInPartitioner {
+    private final Logger log;
+    private final String topic;
+    private final int stickyBatchSize;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();
+
+    // Visible and used for testing only.
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     */
+    public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) {
+        this.log = logContext.logger(BuiltInPartitioner.class);
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+        int partition;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0) {
+                partition = availablePartitions.get(random % availablePartitions.size()).partition();
+            } else {
+                // We don't have available partitions, just pick one among all partitions.
+                List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+                partition = random % partitions.size();
+            }
+        } else {
+            // Calculate next partition based on load distribution.
+            // Note that partitions without leader are excluded from the partitionLoadStats.
+            assert partitionLoadStats.length > 0;
+
+            int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
+            int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+            // By construction, the cumulative frequency table is sorted, so we can use binary
+            // search to find the desired index.
+            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);
+
+            // binarySearch results the index of the found element, or -(insertion_point) - 1
+            // (where insertion_point is the index of the first element greater than the key).
+            // We need to get the index of the first value that is strictly greater, which
+            // would be the insertion point, except if we found the element that's equal to
+            // the searched value (in this case we need to get next).  For example, if we have
+            //  4 5 8
+            // and we're looking for 3, then we'd get the insertion_point = 0, and the function
+            // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're looking for 4, we'd
+            // get 0, and we need the next one, so adding 1 works here as well.
+            int partitionIndex = Math.abs(searchResult + 1);
+            assert partitionIndex < partitionLoadStats.length;
+            partition = partitionLoadStats.partitionIds[partitionIndex];
+        }
+
+        log.trace("Switching to partition {} in topic {}", partition, topic);
+        return partition;
+    }
+
+    /**
+     * Test-only function.  When partition load stats are defined, return the end of range for the
+     * random number.
+     */
+    public int loadStatsRangeEnd() {
+        assert partitionLoadStats != null;
+        assert partitionLoadStats.length > 0;
+        return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+    }
+
+    /**
+     * Peek currently chosen sticky partition.  This method works in conjunction with {@link #isPartitionChanged}
+     * and {@link #updatePartitionInfo}.  The workflow is the following:
+     *
+     * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+     * 2. Lock partition's batch queue.
+     * 3. isPartitionChanged under lock to make sure that nobody raced us.
+     * 4. Append data to buffer.
+     * 5. updatePartitionInfo to update produced bytes and maybe switch partition.
+     *
+     *  It's important that steps 3-5 are under partition's batch queue lock.
+     *
+     * @param cluster The cluster information (needed if there is no current partition)
+     * @return sticky partition info object
+     */
+    StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+        if (partitionInfo != null)
+            return partitionInfo;
+
+        // We're the first to create it.
+        partitionInfo = new StickyPartitionInfo(nextPartition(cluster));
+        if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
+            return partitionInfo;
+
+        // Someone has raced us.
+        return stickyPartitionInfo.get();
+    }
+
+    /**
+     * Check if partition is changed by a concurrent thread.  NOTE this function needs to be called under
+     * the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo
+     * @return true if sticky partition object is changed (race condition)
+     */
+    boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
+        // partitionInfo may be null if the caller didn't use built-in partitioner.
+        return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo;
+    }
+
+    /**
+     * Update partition info with the number of bytes appended and maybe switch partition.
+     * NOTE this function needs to be called under the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo
+     * @param appendedBytes The number of bytes appended to this partition
+     * @param cluster The cluster information
+     */
+    void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) {
+        // partitionInfo may be null if the caller didn't use built-in partitioner.
+        if (partitionInfo == null)
+            return;
+
+        assert partitionInfo == stickyPartitionInfo.get();
+        int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes);
+        if (producedBytes >= stickyBatchSize) {
+            // We've produced enough to this partition, switch to next.
+            StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(nextPartition(cluster));
+            stickyPartitionInfo.set(newPartitionInfo);
+        }
+    }
+
+    /**
+     * Update partition load stats from the queue sizes of each partition
+     * NOTE: queueSizes are modified in place to avoid allocations
+     *
+     * @param queueSizes The queue sizes, partitions without leaders are excluded
+     * @param partitionIds The partition ids for the queues, partitions without leaders are excluded
+     * @param length The logical length of the arrays (could be less): we may eliminate some partitions
+     *               based on latency, but to avoid reallocation of the arrays, we just decrement
+     *               logical length
+     * Visible for testing
+     */
+    public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int length) {
+        if (queueSizes == null) {
+            log.trace("No load stats for topic {}, not using adaptive", topic);
+            partitionLoadStats = null;
+            return;
+        }
+        assert queueSizes.length == partitionIds.length;
+        assert length <= queueSizes.length;
+
+        // The queueSizes.length represents the number of all partitions in the topic and if we have
+        // less than 2 partitions, there is no need to do adaptive logic.
+        // If partitioner.availability.timeout.ms != 0, then partitions that experience high latencies
+        // (greater than partitioner.availability.timeout.ms) may be excluded, the length represents
+        // partitions that are not excluded.  If some partitions were excluded, we'd still want to
+        // go through adaptive logic, even if we have one partition.
+        // See also RecordAccumulator#partitionReady where the queueSizes are built.
+        if (length < 1 || queueSizes.length < 2) {
+            log.trace("The number of partitions is too small: available={}, all={}, not using adaptive for topic {}",
+                    length, queueSizes.length, topic);
+            partitionLoadStats = null;
+            return;
+        }
+
+        // We build cumulative frequency table from the queue sizes in place.  At the beginning
+        // each entry contains queue size, then we invert it (so it represents the frequency)
+        // and convert to a running sum.  Then a uniformly distributed random variable
+        // in the range [0..last) would map to a partition with weighted probability.
+        // Example: suppose we have 3 partitions with the corresponding queue sizes:
+        //  0 3 1
+        // Then we can invert them by subtracting the queue size from the max queue size + 1 = 4:
+        //  4 1 3
+        // Then we can convert it into a running sum (next value adds previous value):
+        //  4 5 8
+        // Now if we get a random number in the range [0..8) and find the first value that
+        // is strictly greater than the number (e.g. for 4 it would be 5), then the index of
+        // the value is the index of the partition we're looking for.  In this example
+        // random numbers 0, 1, 2, 3 would map to partition[0], 4 would map to partition[1]
+        // and 5, 6, 7 would map to partition[2].
+
+        // Calculate max queue size + 1 and check if all sizes are the same.
+        int maxSizePlus1 = queueSizes[0];
+        boolean allEqual = true;
+        for (int i = 1; i < length; i++) {
+            if (queueSizes[i] != maxSizePlus1)
+                allEqual = false;
+            if (queueSizes[i] > maxSizePlus1)
+                maxSizePlus1 = queueSizes[i];
+        }
+        ++maxSizePlus1;
+
+        if (allEqual && length == queueSizes.length) {
+            // No need to have complex probability logic when all queue sizes are the same,
+            // and we didn't exclude partitions that experience high latencies (greater than
+            // partitioner.availability.timeout.ms).
+            log.trace("All queue lengths are the same, not using adaptive for topic {}", topic);
+            partitionLoadStats = null;
+            return;
+        }
+
+        // Invert and fold the queue size, so that they become separator values in the CFT.
+        queueSizes[0] = maxSizePlus1 - queueSizes[0];
+        for (int i = 1; i < length; i++) {
+            queueSizes[i] = maxSizePlus1 - queueSizes[i] + queueSizes[i - 1];
+        }
+        log.trace("Partition load stats for topic {}: CFT={}, IDs={}, length={}",
+                topic, queueSizes, partitionIds, length);
+        partitionLoadStats = new PartitionLoadStats(queueSizes, partitionIds, length);
+    }
+
+    /**
+     * Info for the current sticky partition.
+     */
+    public static class StickyPartitionInfo {
+        private final int index;
+        private final AtomicInteger producedBytes = new AtomicInteger();
+
+        StickyPartitionInfo(int index) {
+            this.index = index;
+        }
+
+        public int partition() {
+            return index;
+        }
+    }
+
+    /**
+     * The partition load stats for each topic that are used for adaptive partition distribution.
+     */
+    private final static class PartitionLoadStats {
+        public final int[] cumulativeFrequencyTable;
+        public final int[] partitionIds;
+        public final int length;
+        public PartitionLoadStats(int[] cumulativeFrequencyTable, int[] partitionIds, int length) {
+            assert cumulativeFrequencyTable.length == partitionIds.length;
+            assert length <= cumulativeFrequencyTable.length;
+            this.cumulativeFrequencyTable = cumulativeFrequencyTable;
+            this.partitionIds = partitionIds;
+            this.length = length;
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index cf765d1eee..2c2e79fb20 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -23,6 +23,9 @@ import org.apache.kafka.common.utils.Utils;
 import java.util.Map;
 
 /**
+ * NOTE this partitioner is deprecated and shouldn't be used.  To use default partitioning logic
+ * remove partitioner.class configuration setting.  See KIP-794 for more info.
+ *
  * The default partitioning strategy:
  * <ul>
  * <li>If a partition is specified in the record, use it
@@ -31,6 +34,7 @@ import java.util.Map;
  * 
  * See KIP-480 for details about sticky partitioning.
  */
+@Deprecated
 public class DefaultPartitioner implements Partitioner {
 
     private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
@@ -77,6 +81,7 @@ public class DefaultPartitioner implements Partitioner {
      * If a batch completed for the current sticky partition, change the sticky partition. 
      * Alternately, if no sticky partition has been determined, set one.
      */
+    @SuppressWarnings("deprecation")
     public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
         stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 063d117502..ef1ff7d7d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -29,8 +29,10 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -65,6 +67,7 @@ import org.slf4j.Logger;
  */
 public class RecordAccumulator {
 
+    private final LogContext logContext;
     private final Logger log;
     private volatile boolean closed;
     private final AtomicInteger flushesInProgress;
@@ -74,10 +77,13 @@ public class RecordAccumulator {
     private final int lingerMs;
     private final long retryBackoffMs;
     private final int deliveryTimeoutMs;
+    private final long partitionAvailabilityTimeoutMs;  // latency threshold for marking partition temporary unavailable
+    private final boolean enableAdaptivePartitioning;
     private final BufferPool free;
     private final Time time;
     private final ApiVersions apiVersions;
-    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
+    private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();
+    private final ConcurrentMap<Integer /*nodeId*/, NodeLatencyStats> nodeStats = new CopyOnWriteMap<>();
     private final IncompleteBatches incomplete;
     // The following variables are only accessed by the sender thread, so we don't need to protect them.
     private final Set<TopicPartition> muted;
@@ -96,11 +102,15 @@ public class RecordAccumulator {
      *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
      * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
      *        exhausting all retries in a short period of time.
+     * @param deliveryTimeoutMs An upper bound on the time to report success or failure on record delivery
+     * @param partitionerConfig Partitioner config
      * @param metrics The metrics
+     * @param metricGrpName The metric group name
      * @param time The time instance to use
      * @param apiVersions Request API versions for current connected brokers
      * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
      *                           numbers per partition.
+     * @param bufferPool The buffer pool
      */
     public RecordAccumulator(LogContext logContext,
                              int batchSize,
@@ -108,12 +118,14 @@ public class RecordAccumulator {
                              int lingerMs,
                              long retryBackoffMs,
                              int deliveryTimeoutMs,
+                             PartitionerConfig partitionerConfig,
                              Metrics metrics,
                              String metricGrpName,
                              Time time,
                              ApiVersions apiVersions,
                              TransactionManager transactionManager,
                              BufferPool bufferPool) {
+        this.logContext = logContext;
         this.log = logContext.logger(RecordAccumulator.class);
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
@@ -123,7 +135,8 @@ public class RecordAccumulator {
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
         this.deliveryTimeoutMs = deliveryTimeoutMs;
-        this.batches = new CopyOnWriteMap<>();
+        this.enableAdaptivePartitioning = partitionerConfig.enableAdaptivePartitioning;
+        this.partitionAvailabilityTimeoutMs = partitionerConfig.partitionAvailabilityTimeoutMs;
         this.free = bufferPool;
         this.incomplete = new IncompleteBatches();
         this.muted = new HashSet<>();
@@ -134,6 +147,53 @@ public class RecordAccumulator {
         registerMetrics(metrics, metricGrpName);
     }
 
+    /**
+     * Create a new record accumulator with default partitioner config
+     *
+     * @param logContext The log context used for logging
+     * @param batchSize The size to use when allocating {@link MemoryRecords} instances
+     * @param compression The compression codec for the records
+     * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
+     *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
+     *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
+     * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
+     *        exhausting all retries in a short period of time.
+     * @param deliveryTimeoutMs An upper bound on the time to report success or failure on record delivery
+     * @param metrics The metrics
+     * @param metricGrpName The metric group name
+     * @param time The time instance to use
+     * @param apiVersions Request API versions for current connected brokers
+     * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
+     *                           numbers per partition.
+     * @param bufferPool The buffer pool
+     */
+    public RecordAccumulator(LogContext logContext,
+                             int batchSize,
+                             CompressionType compression,
+                             int lingerMs,
+                             long retryBackoffMs,
+                             int deliveryTimeoutMs,
+                             Metrics metrics,
+                             String metricGrpName,
+                             Time time,
+                             ApiVersions apiVersions,
+                             TransactionManager transactionManager,
+                             BufferPool bufferPool) {
+        this(logContext,
+            batchSize,
+            compression,
+            lingerMs,
+            retryBackoffMs,
+            deliveryTimeoutMs,
+            new PartitionerConfig(),
+            metrics,
+            metricGrpName,
+            time,
+            apiVersions,
+            transactionManager,
+            bufferPool);
+    }
+
     private void registerMetrics(Metrics metrics, String metricGrpName) {
         MetricName metricName = metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records");
         Measurable waitingThreads = new Measurable() {
@@ -160,91 +220,162 @@ public class RecordAccumulator {
         metrics.addMetric(metricName, availableBytes);
     }
 
+    private void setPartition(AppendCallbacks callbacks, int partition) {
+        if (callbacks != null)
+            callbacks.setPartition(partition);
+    }
+
     /**
      * Add a record to the accumulator, return the append result
      * <p>
      * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
      * <p>
      *
-     * @param tp The topic/partition to which this record is being sent
+     * @param topic The topic to which this record is being sent
+     * @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION
+     *                  if any partition could be used
      * @param timestamp The timestamp of the record
      * @param key The key for the record
      * @param value The value for the record
      * @param headers the Headers for the record
-     * @param callback The user-supplied callback to execute when the request is complete
+     * @param callbacks The callbacks to execute
      * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
      * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
      *                        running the partitioner's onNewBatch method before trying to append again
      * @param nowMs The current time, in milliseconds
+     * @param cluster The cluster metadata
      */
-    public RecordAppendResult append(TopicPartition tp,
+    public RecordAppendResult append(String topic,
+                                     int partition,
                                      long timestamp,
                                      byte[] key,
                                      byte[] value,
                                      Header[] headers,
-                                     Callback callback,
+                                     AppendCallbacks callbacks,
                                      long maxTimeToBlock,
                                      boolean abortOnNewBatch,
-                                     long nowMs) throws InterruptedException {
+                                     long nowMs,
+                                     Cluster cluster) throws InterruptedException {
+        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));
+
         // We keep track of the number of appending thread to make sure we do not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
         ByteBuffer buffer = null;
         if (headers == null) headers = Record.EMPTY_HEADERS;
         try {
-            // check if we have an in-progress batch
-            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
-            synchronized (dq) {
-                if (closed)
-                    throw new KafkaException("Producer closed while send in progress");
-                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
-                if (appendResult != null)
-                    return appendResult;
-            }
+            // Loop to retry in case we encounter partitioner's race conditions.
+            while (true) {
+                // If the message doesn't have any partition affinity, so we pick a partition based on the broker
+                // availability and performance.  Note, that here we peek current partition before we hold the
+                // deque lock, so we'll need to make sure that it's not changed while we were waiting for the
+                // deque lock.
+                final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
+                final int effectivePartition;
+                if (partition == RecordMetadata.UNKNOWN_PARTITION) {
+                    partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
+                    effectivePartition = partitionInfo.partition();
+                } else {
+                    partitionInfo = null;
+                    effectivePartition = partition;
+                }
 
-            // we don't have an in-progress record batch try to allocate a new batch
-            if (abortOnNewBatch) {
-                // Return a result that will cause another call to append.
-                return new RecordAppendResult(null, false, false, true);
-            }
+                // Now that we know the effective partition, let the caller know.
+                setPartition(callbacks, effectivePartition);
+
+                // check if we have an in-progress batch
+                Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition hasn't changed and retry.
+                    if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+                        log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
+                                partitionInfo.partition(), topic);
+                        continue;
+                    }
+                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
+                    if (appendResult != null) {
+                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
+                        return appendResult;
+                    }
+                }
 
-            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
-            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
-            log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
-            buffer = free.allocate(size, maxTimeToBlock);
+                // we don't have an in-progress record batch try to allocate a new batch
+                if (abortOnNewBatch) {
+                    // Return a result that will cause another call to append.
+                    return new RecordAppendResult(null, false, false, true, 0);
+                }
 
-            // Update the current time in case the buffer allocation blocked above.
-            nowMs = time.milliseconds();
-            synchronized (dq) {
-                // Need to check if producer is closed again after grabbing the dequeue lock.
-                if (closed)
-                    throw new KafkaException("Producer closed while send in progress");
+                if (buffer == null) {
+                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
+                    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
+                    log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    buffer = free.allocate(size, maxTimeToBlock);
+                }
 
-                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
-                if (appendResult != null) {
-                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition hasn't changed and retry.
+                    if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+                        log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
+                                partitionInfo.partition(), topic);
+                        continue;
+                    }
+                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+                    // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
+                    if (appendResult.newBatchCreated)
+                        buffer = null;
+                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
                     return appendResult;
                 }
-
-                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
-                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
-                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
-                        callback, nowMs));
-
-                dq.addLast(batch);
-                incomplete.add(batch);
-
-                // Don't deallocate this buffer in the finally block as it's being used in the record batch
-                buffer = null;
-                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
             }
         } finally {
-            if (buffer != null)
-                free.deallocate(buffer);
+            free.deallocate(buffer);
             appendsInProgress.decrementAndGet();
         }
     }
 
+    /**
+     * Append a new batch to the queue
+     *
+     * @param topic The topic
+     * @param partition The partition (cannot be RecordMetadata.UNKNOWN_PARTITION)
+     * @param dq The queue
+     * @param timestamp The timestamp of the record
+     * @param key The key for the record
+     * @param value The value for the record
+     * @param headers the Headers for the record
+     * @param callbacks The callbacks to execute
+     * @param buffer The buffer for the new batch
+     */
+    private RecordAppendResult appendNewBatch(String topic,
+                                              int partition,
+                                              Deque<ProducerBatch> dq,
+                                              long timestamp,
+                                              byte[] key,
+                                              byte[] value,
+                                              Header[] headers,
+                                              AppendCallbacks callbacks,
+                                              ByteBuffer buffer) {
+        assert partition != RecordMetadata.UNKNOWN_PARTITION;
+
+        // Update the current time in case the buffer allocation blocked above.
+        long nowMs = time.milliseconds();
+        RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
+        if (appendResult != null) {
+            // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
+            return appendResult;
+        }
+
+        MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());
+        ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
+        FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
+                callbacks, nowMs));
+
+        dq.addLast(batch);
+        incomplete.add(batch);
+
+        return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes());
+    }
+
     private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
         if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
             throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
@@ -263,13 +394,18 @@ public class RecordAccumulator {
      */
     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                          Callback callback, Deque<ProducerBatch> deque, long nowMs) {
+        if (closed)
+            throw new KafkaException("Producer closed while send in progress");
         ProducerBatch last = deque.peekLast();
         if (last != null) {
+            int initialBytes = last.estimatedSizeInBytes();
             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
-            if (future == null)
+            if (future == null) {
                 last.closeForRecordAppends();
-            else
-                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
+            } else {
+                int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
+                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes);
+            }
         }
         return null;
     }
@@ -298,19 +434,20 @@ public class RecordAccumulator {
      */
     public List<ProducerBatch> expiredBatches(long now) {
         List<ProducerBatch> expiredBatches = new ArrayList<>();
-        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
-            // expire the batches in the order of sending
-            Deque<ProducerBatch> deque = entry.getValue();
-            synchronized (deque) {
-                while (!deque.isEmpty()) {
-                    ProducerBatch batch = deque.getFirst();
-                    if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) {
-                        deque.poll();
-                        batch.abortRecordAppends();
-                        expiredBatches.add(batch);
-                    } else {
-                        maybeUpdateNextBatchExpiryTime(batch);
-                        break;
+        for (TopicInfo topicInfo : topicInfoMap.values()) {
+            for (Deque<ProducerBatch> deque : topicInfo.batches.values()) {
+                // expire the batches in the order of sending
+                synchronized (deque) {
+                    while (!deque.isEmpty()) {
+                        ProducerBatch batch = deque.getFirst();
+                        if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) {
+                            deque.poll();
+                            batch.abortRecordAppends();
+                            expiredBatches.add(batch);
+                        } else {
+                            maybeUpdateNextBatchExpiryTime(batch);
+                            break;
+                        }
                     }
                 }
             }
@@ -420,38 +557,94 @@ public class RecordAccumulator {
     }
 
     /**
-     * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
-     * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
-     * partition batches.
-     * <p>
-     * A destination node is ready to send data if:
-     * <ol>
-     * <li>There is at least one partition that is not backing off its send
-     * <li><b>and</b> those partitions are not muted (to prevent reordering if
-     *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
-     *   is set to one)</li>
-     * <li><b>and <i>any</i></b> of the following are true</li>
-     * <ul>
-     *     <li>The record set is full</li>
-     *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
-     *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
-     *     are immediately considered ready).</li>
-     *     <li>The accumulator has been closed</li>
-     * </ul>
-     * </ol>
+     * Add the leader to the ready nodes if the batch is ready
+     *
+     * @param nowMs The current time
+     * @param exhausted 'true' is the buffer pool is exhausted
+     * @param part The partition
+     * @param leader The leader for the partition
+     * @param waitedTimeMs How long batch waited
+     * @param backingOff Is backing off
+     * @param full Is batch full
+     * @param nextReadyCheckDelayMs The delay for next check
+     * @param readyNodes The set of ready nodes (to be filled in)
+     * @return The delay for next check
      */
-    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
-        Set<Node> readyNodes = new HashSet<>();
-        long nextReadyCheckDelayMs = Long.MAX_VALUE;
-        Set<String> unknownLeaderTopics = new HashSet<>();
+    private long batchReady(long nowMs, boolean exhausted, TopicPartition part, Node leader,
+                            long waitedTimeMs, boolean backingOff, boolean full,
+                            long nextReadyCheckDelayMs, Set<Node> readyNodes) {
+        if (!readyNodes.contains(leader) && !isMuted(part)) {
+            long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+            boolean expired = waitedTimeMs >= timeToWaitMs;
+            boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
+            boolean sendable = full
+                    || expired
+                    || exhausted
+                    || closed
+                    || flushInProgress()
+                    || transactionCompleting;
+            if (sendable && !backingOff) {
+                readyNodes.add(leader);
+            } else {
+                long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
+                // Note that this results in a conservative estimate since an un-sendable partition may have
+                // a leader that will later be found to have sendable data. However, this is good enough
+                // since we'll just wake up and then sleep again for the remaining time.
+                nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
+            }
+        }
+        return nextReadyCheckDelayMs;
+    }
+
+    /**
+     * Iterate over partitions to see which one have batches ready and collect leaders of those partitions
+     * into the set of ready nodes.  If partition has no leader, add the topic to the set of topics with
+     * no leader.  This function also calculates stats for adaptive partitioning.
+     *
+     * @param cluster The cluster metadata
+     * @param nowMs The current time
+     * @param topic The topic
+     * @param topicInfo The topic info
+     * @param nextReadyCheckDelayMs The delay for next check
+     * @param readyNodes The set of ready nodes (to be filled in)
+     * @param unknownLeaderTopics The set of topics with no leader (to be filled in)
+     * @return The delay for next check
+     */
+    private long partitionReady(Cluster cluster, long nowMs, String topic,
+                                TopicInfo topicInfo,
+                                long nextReadyCheckDelayMs, Set<Node> readyNodes, Set<String> unknownLeaderTopics) {
+        ConcurrentMap<Integer, Deque<ProducerBatch>> batches = topicInfo.batches;
+        // Collect the queue sizes for available partitions to be used in adaptive partitioning.
+        int[] queueSizes = null;
+        int[] partitionIds = null;
+        if (enableAdaptivePartitioning && batches.size() >= cluster.partitionsForTopic(topic).size()) {
+            // We don't do adaptive partitioning until we scheduled at least a batch for all
+            // partitions (i.e. we have the corresponding entries in the batches map), we just
+            // do uniform.  The reason is that we build queue sizes from the batches map,
+            // and if an entry is missing in the batches map, then adaptive partitioning logic
+            // won't know about it and won't switch to it.
+            queueSizes = new int[batches.size()];
+            partitionIds = new int[queueSizes.length];
+        }
 
+        int queueSizesIndex = -1;
         boolean exhausted = this.free.queued() > 0;
-        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
+        for (Map.Entry<Integer, Deque<ProducerBatch>> entry : batches.entrySet()) {
+            TopicPartition part = new TopicPartition(topic, entry.getKey());
+            // Advance queueSizesIndex so that we properly index available
+            // partitions.  Do it here so that it's done for all code paths.
+            Node leader = cluster.leaderFor(part);
+            if (leader != null && queueSizes != null) {
+                ++queueSizesIndex;
+                assert queueSizesIndex < queueSizes.length;
+                partitionIds[queueSizesIndex] = part.partition();
+            }
+
             Deque<ProducerBatch> deque = entry.getValue();
 
-            final ProducerBatch batch;
             final long waitedTimeMs;
             final boolean backingOff;
+            final int dequeSize;
             final boolean full;
 
             // This loop is especially hot with large partition counts.
@@ -463,43 +656,81 @@ public class RecordAccumulator {
             synchronized (deque) {
                 // Deques are often empty in this path, esp with large partition counts,
                 // so we exit early if we can.
-                batch = deque.peekFirst();
+                ProducerBatch batch = deque.peekFirst();
                 if (batch == null) {
                     continue;
                 }
 
                 waitedTimeMs = batch.waitedTimeMs(nowMs);
                 backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
-                full = deque.size() > 1 || batch.isFull();
+                dequeSize = deque.size();
+                full = dequeSize > 1 || batch.isFull();
             }
 
-            TopicPartition part = entry.getKey();
-            Node leader = cluster.leaderFor(part);
             if (leader == null) {
                 // This is a partition for which leader is not known, but messages are available to send.
                 // Note that entries are currently not removed from batches when deque is empty.
                 unknownLeaderTopics.add(part.topic());
-            } else if (!readyNodes.contains(leader) && !isMuted(part)) {
-                long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
-                boolean expired = waitedTimeMs >= timeToWaitMs;
-                boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
-                boolean sendable = full
-                    || expired
-                    || exhausted
-                    || closed
-                    || flushInProgress()
-                    || transactionCompleting;
-                if (sendable && !backingOff) {
-                    readyNodes.add(leader);
-                } else {
-                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
-                    // Note that this results in a conservative estimate since an un-sendable partition may have
-                    // a leader that will later be found to have sendable data. However, this is good enough
-                    // since we'll just wake up and then sleep again for the remaining time.
-                    nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
+            } else {
+                if (queueSizes != null)
+                    queueSizes[queueSizesIndex] = dequeSize;
+                if (partitionAvailabilityTimeoutMs > 0) {
+                    // Check if we want to exclude the partition from the list of available partitions
+                    // if the broker hasn't responded for some time.
+                    NodeLatencyStats nodeLatencyStats = nodeStats.get(leader.id());
+                    if (nodeLatencyStats != null) {
+                        // NOTE: there is no synchronization between reading metrics,
+                        // so we read ready time first to avoid accidentally marking partition
+                        // unavailable if we read while the metrics are being updated.
+                        long readyTimeMs = nodeLatencyStats.readyTimeMs;
+                        if (readyTimeMs - nodeLatencyStats.drainTimeMs > partitionAvailabilityTimeoutMs)
+                            --queueSizesIndex;
+                    }
                 }
+
+                nextReadyCheckDelayMs = batchReady(nowMs, exhausted, part, leader, waitedTimeMs, backingOff,
+                    full, nextReadyCheckDelayMs, readyNodes);
             }
         }
+
+        // We've collected the queue sizes for partitions of this topic, now we can calculate
+        // load stats.  NOTE: the stats are calculated in place, modifying the
+        // queueSizes array.
+        topicInfo.builtInPartitioner.updatePartitionLoadStats(queueSizes, partitionIds, queueSizesIndex + 1);
+        return nextReadyCheckDelayMs;
+    }
+
+    /**
+     * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
+     * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
+     * partition batches.
+     * <p>
+     * A destination node is ready to send data if:
+     * <ol>
+     * <li>There is at least one partition that is not backing off its send
+     * <li><b>and</b> those partitions are not muted (to prevent reordering if
+     *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
+     *   is set to one)</li>
+     * <li><b>and <i>any</i></b> of the following are true</li>
+     * <ul>
+     *     <li>The record set is full</li>
+     *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
+     *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
+     *     are immediately considered ready).</li>
+     *     <li>The accumulator has been closed</li>
+     * </ul>
+     * </ol>
+     */
+    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
+        Set<Node> readyNodes = new HashSet<>();
+        long nextReadyCheckDelayMs = Long.MAX_VALUE;
+        Set<String> unknownLeaderTopics = new HashSet<>();
+        // Go topic by topic so that we can get queue sizes for partitions in a topic and calculate
+        // cumulative frequency table (used in partitioner).
+        for (Map.Entry<String, TopicInfo> topicInfoEntry : this.topicInfoMap.entrySet()) {
+            final String topic = topicInfoEntry.getKey();
+            nextReadyCheckDelayMs = partitionReady(cluster, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
+        }
         return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
     }
 
@@ -507,11 +738,12 @@ public class RecordAccumulator {
      * Check whether there are any batches which haven't been drained
      */
     public boolean hasUndrained() {
-        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
-            Deque<ProducerBatch> deque = entry.getValue();
-            synchronized (deque) {
-                if (!deque.isEmpty())
-                    return true;
+        for (TopicInfo topicInfo : topicInfoMap.values()) {
+            for (Deque<ProducerBatch> deque : topicInfo.batches.values()) {
+                synchronized (deque) {
+                    if (!deque.isEmpty())
+                        return true;
+                }
             }
         }
         return false;
@@ -669,6 +901,36 @@ public class RecordAccumulator {
         return batches;
     }
 
+    public void updateNodeLatencyStats(Integer nodeId, long nowMs, boolean canDrain) {
+        // Don't bother with updating stats if the feature is turned off.
+        if (partitionAvailabilityTimeoutMs <= 0)
+            return;
+
+        // When the sender gets a node (returned by the ready() function) that has data to send
+        // but the node is not ready (and so we cannot drain the data), we only update the
+        // ready time, then the difference would reflect for how long a node wasn't ready
+        // to send the data.  Then we can temporarily remove partitions that are handled by the
+        // node from the list of available partitions so that the partitioner wouldn't pick
+        // this partition.
+        // NOTE: there is no synchronization for metric updates, so drainTimeMs is updated
+        // first to avoid accidentally marking a partition unavailable if the reader gets
+        // values between updates.
+        NodeLatencyStats nodeLatencyStats = nodeStats.computeIfAbsent(nodeId, id -> new NodeLatencyStats(nowMs));
+        if (canDrain)
+            nodeLatencyStats.drainTimeMs = nowMs;
+        nodeLatencyStats.readyTimeMs = nowMs;
+    }
+
+    /* Visible for testing */
+    public NodeLatencyStats getNodeLatencyStats(Integer nodeId) {
+        return nodeStats.get(nodeId);
+    }
+
+    /* Visible for testing */
+    public BuiltInPartitioner getBuiltInPartitioner(String topic) {
+        return topicInfoMap.get(topic).builtInPartitioner;
+    }
+
     /**
      * The earliest absolute time a batch will expire (in milliseconds)
      */
@@ -676,23 +938,20 @@ public class RecordAccumulator {
         return this.nextBatchExpiryTimeMs;
     }
 
-    private Deque<ProducerBatch> getDeque(TopicPartition tp) {
-        return batches.get(tp);
+      /* Visible for testing */
+    public Deque<ProducerBatch> getDeque(TopicPartition tp) {
+        TopicInfo topicInfo = topicInfoMap.get(tp.topic());
+        if (topicInfo == null)
+            return null;
+        return topicInfo.batches.get(tp.partition());
     }
 
     /**
      * Get the deque for the given topic-partition, creating it if necessary.
      */
     private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
-        Deque<ProducerBatch> d = this.batches.get(tp);
-        if (d != null)
-            return d;
-        d = new ArrayDeque<>();
-        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
-        if (previous == null)
-            return d;
-        else
-            return previous;
+        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), k -> new TopicInfo(logContext, k, batchSize));
+        return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new ArrayDeque<>());
     }
 
     /**
@@ -722,11 +981,6 @@ public class RecordAccumulator {
         return flushesInProgress.get() > 0;
     }
 
-    /* Visible for testing */
-    Map<TopicPartition, Deque<ProducerBatch>> batches() {
-        return Collections.unmodifiableMap(batches);
-    }
-
     /**
      * Initiate the flushing of data from the accumulator...this makes all requests immediately ready
      */
@@ -780,7 +1034,7 @@ public class RecordAccumulator {
         // flag set. We need to do the last abort after no thread was appending in case there was a new
         // batch appended by the last appending thread.
         abortBatches();
-        this.batches.clear();
+        this.topicInfoMap.clear();
     }
 
     /**
@@ -842,6 +1096,32 @@ public class RecordAccumulator {
         this.free.close();
     }
 
+    /**
+     * Partitioner config for built-in partitioner
+     */
+    public static final class PartitionerConfig {
+        private final boolean enableAdaptivePartitioning;
+        private final long partitionAvailabilityTimeoutMs;
+
+        /**
+         * Partitioner config
+         *
+         * @param enableAdaptivePartitioning If it's true, partition switching adapts to broker load, otherwise partition
+         *        switching is random.
+         * @param partitionAvailabilityTimeoutMs If a broker cannot process produce requests from a partition
+         *        for the specified time, the partition is treated by the partitioner as not available.
+         *        If the timeout is 0, this logic is disabled.
+         */
+        public PartitionerConfig(boolean enableAdaptivePartitioning, long partitionAvailabilityTimeoutMs) {
+            this.enableAdaptivePartitioning = enableAdaptivePartitioning;
+            this.partitionAvailabilityTimeoutMs = partitionAvailabilityTimeoutMs;
+        }
+
+        public PartitionerConfig() {
+            this(false, 0);
+        }
+    }
+
     /*
      * Metadata about a record just appended to the record accumulator
      */
@@ -850,15 +1130,32 @@ public class RecordAccumulator {
         public final boolean batchIsFull;
         public final boolean newBatchCreated;
         public final boolean abortForNewBatch;
+        public final int appendedBytes;
 
-        public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated, boolean abortForNewBatch) {
+        public RecordAppendResult(FutureRecordMetadata future,
+                                  boolean batchIsFull,
+                                  boolean newBatchCreated,
+                                  boolean abortForNewBatch,
+                                  int appendedBytes) {
             this.future = future;
             this.batchIsFull = batchIsFull;
             this.newBatchCreated = newBatchCreated;
             this.abortForNewBatch = abortForNewBatch;
+            this.appendedBytes = appendedBytes;
         }
     }
 
+    /*
+     * The callbacks passed into append
+     */
+    public interface AppendCallbacks extends Callback {
+        /**
+         * Called to set partition (when append is called, partition may not be calculated yet).
+         * @param partition The partition
+         */
+        void setPartition(int partition);
+    }
+
     /*
      * The set of nodes that have at least one complete record batch in the accumulator
      */
@@ -873,4 +1170,30 @@ public class RecordAccumulator {
             this.unknownLeaderTopics = unknownLeaderTopics;
         }
     }
+
+    /**
+     * Per topic info.
+     */
+    private static class TopicInfo {
+        public final ConcurrentMap<Integer /*partition*/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
+        public final BuiltInPartitioner builtInPartitioner;
+
+        public TopicInfo(LogContext logContext, String topic, int stickyBatchSize) {
+            builtInPartitioner = new BuiltInPartitioner(logContext, topic, stickyBatchSize);
+        }
+    }
+
+    /**
+     * Node latency stats for each node that are used for adaptive partition distribution
+     * Visible for testing
+     */
+    public final static class NodeLatencyStats {
+        volatile public long readyTimeMs;  // last time the node had batches ready to send
+        volatile public long drainTimeMs;  // last time the node was able to drain batches
+
+        NodeLatencyStats(long nowMs) {
+            readyTimeMs = nowMs;
+            drainTimeMs = nowMs;
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 2f55e62912..55eb6c7be2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -352,8 +352,16 @@ public class Sender implements Runnable {
         while (iter.hasNext()) {
             Node node = iter.next();
             if (!this.client.ready(node, now)) {
+                // Update just the readyTimeMs of the latency stats, so that it moves forward
+                // every time the batch is ready (then the difference between readyTimeMs and
+                // drainTimeMs would represent how long data is waiting for the node).
+                this.accumulator.updateNodeLatencyStats(node.id(), now, false);
                 iter.remove();
                 notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
+            } else {
+                // Update both readyTimeMs and drainTimeMs, this would "reset" the node
+                // latency.
+                this.accumulator.updateNodeLatencyStats(node.id(), now, true);
             }
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 3e3faeaadf..af71e3ecd3 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1038,7 +1038,7 @@ public final class Utils {
      *
      * Note: changing this method in the future will possibly cause partition selection not to be
      * compatible with the existing messages already placed on a partition since it is used
-     * in producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}
+     * in producer's partition selection logic {@link org.apache.kafka.clients.producer.KafkaProducer}
      *
      * @param number a given number
      * @return a positive number.
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index ce01620803..dc7db382a6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -1990,6 +1990,7 @@ public class KafkaProducerTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testPartitionAddedToTransactionAfterFullBatchRetry() throws Exception {
         StringSerializer serializer = new StringSerializer();
@@ -2053,21 +2054,28 @@ public class KafkaProducerTest {
         )).thenReturn(initialSelectedPartition.partition());
 
         when(ctx.accumulator.append(
-            eq(initialSelectedPartition),
-            eq(timestamp),
-            eq(serializedKey),
-            eq(serializedValue),
-            eq(Record.EMPTY_HEADERS),
-            any(Callback.class),
+            eq(initialSelectedPartition.topic()),            // 0
+            eq(initialSelectedPartition.partition()),        // 1
+            eq(timestamp),                                   // 2
+            eq(serializedKey),                               // 3
+            eq(serializedValue),                             // 4
+            eq(Record.EMPTY_HEADERS),                        // 5
+            any(RecordAccumulator.AppendCallbacks.class),    // 6 <--
             anyLong(),
             eq(true),
-            anyLong()
-        )).thenReturn(new RecordAccumulator.RecordAppendResult(
-            futureRecordMetadata,
-            false,
-            false,
-            false
-        ));
+            anyLong(),
+            any()
+        )).thenAnswer(invocation -> {
+            RecordAccumulator.AppendCallbacks callbacks =
+                (RecordAccumulator.AppendCallbacks) invocation.getArguments()[6];
+            callbacks.setPartition(initialSelectedPartition.partition());
+            return new RecordAccumulator.RecordAppendResult(
+                futureRecordMetadata,
+                false,
+                false,
+                false,
+                0);
+        });
 
         return futureRecordMetadata;
     }
@@ -2104,38 +2112,52 @@ public class KafkaProducerTest {
           .thenReturn(retrySelectedPartition.partition());
 
         when(ctx.accumulator.append(
-            eq(initialSelectedPartition),
-            eq(timestamp),
-            eq(serializedKey),
-            eq(serializedValue),
-            eq(Record.EMPTY_HEADERS),
-            any(Callback.class),
+            eq(initialSelectedPartition.topic()),            // 0
+            eq(initialSelectedPartition.partition()),        // 1
+            eq(timestamp),                                   // 2
+            eq(serializedKey),                               // 3
+            eq(serializedValue),                             // 4
+            eq(Record.EMPTY_HEADERS),                        // 5
+            any(RecordAccumulator.AppendCallbacks.class),    // 6 <--
             anyLong(),
             eq(true), // abortOnNewBatch
-            anyLong()
-        )).thenReturn(new RecordAccumulator.RecordAppendResult(
-            null,
-            false,
-            false,
-            true
-        ));
+            anyLong(),
+            any()
+        )).thenAnswer(invocation -> {
+            RecordAccumulator.AppendCallbacks callbacks =
+                (RecordAccumulator.AppendCallbacks) invocation.getArguments()[6];
+            callbacks.setPartition(initialSelectedPartition.partition());
+            return new RecordAccumulator.RecordAppendResult(
+                null,
+                false,
+                false,
+                true,
+                0);
+        });
 
         when(ctx.accumulator.append(
-            eq(retrySelectedPartition),
-            eq(timestamp),
-            eq(serializedKey),
-            eq(serializedValue),
-            eq(Record.EMPTY_HEADERS),
-            any(Callback.class),
+            eq(retrySelectedPartition.topic()),              // 0
+            eq(retrySelectedPartition.partition()),          // 1
+            eq(timestamp),                                   // 2
+            eq(serializedKey),                               // 3
+            eq(serializedValue),                             // 4
+            eq(Record.EMPTY_HEADERS),                        // 5
+            any(RecordAccumulator.AppendCallbacks.class),    // 6 <--
             anyLong(),
             eq(false), // abortOnNewBatch
-            anyLong()
-        )).thenReturn(new RecordAccumulator.RecordAppendResult(
-            futureRecordMetadata,
-            false,
-            true,
-            false
-        ));
+            anyLong(),
+            any()
+        )).thenAnswer(invocation -> {
+            RecordAccumulator.AppendCallbacks callbacks =
+                (RecordAccumulator.AppendCallbacks) invocation.getArguments()[6];
+            callbacks.setPartition(retrySelectedPartition.partition());
+            return new RecordAccumulator.RecordAppendResult(
+                futureRecordMetadata,
+                false,
+                true,
+                false,
+                0);
+        });
 
         return futureRecordMetadata;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index ca14ab0fda..8c7884bd77 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
@@ -85,7 +84,7 @@ public class MockProducerTest {
         PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
         Cluster cluster = new Cluster(null, new ArrayList<>(0), asList(partitionInfo0, partitionInfo1),
                 Collections.emptySet(), Collections.emptySet());
-        MockProducer<String, String> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
+        MockProducer<String, String> producer = new MockProducer<>(cluster, true, new StringSerializer(), new StringSerializer());
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
         Future<RecordMetadata> metadata = producer.send(record);
         assertEquals(1, metadata.get().partition(), "Partition should be correct");
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
index 0014bf8daa..f548407171 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
@@ -39,6 +39,7 @@ public class UniformStickyPartitionerTest {
     private final static String TOPIC_A = "TOPIC_A";
     private final static String TOPIC_B = "TOPIC_B";
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testRoundRobinWithUnavailablePartitions() {
         // Intentionally make the partition list not in partition order to test the edge
@@ -77,6 +78,7 @@ public class UniformStickyPartitionerTest {
         assertEquals(countForPart0, countForPart2, "The distribution between two available partitions should be even");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testRoundRobinWithKeyBytes() throws InterruptedException {
         List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
@@ -140,7 +142,8 @@ public class UniformStickyPartitionerTest {
         assertEquals(30, partitionCount.get(oldPart).intValue());
         assertEquals(60, partitionCount.get(newPart).intValue());
     }
-    
+
+    @SuppressWarnings("deprecation")
     @Test
     public void testRoundRobinWithNullKeyBytes() throws InterruptedException {
         List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java
new file mode 100644
index 0000000000..734aedc483
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BuiltInPartitionerTest {
+    private final static Node[] NODES = new Node[] {
+        new Node(0, "localhost", 99),
+        new Node(1, "localhost", 100),
+        new Node(2, "localhost", 101),
+        new Node(11, "localhost", 102)
+    };
+    final static String TOPIC_A = "topicA";
+    final static String TOPIC_B = "topicB";
+    final static String TOPIC_C = "topicC";
+    final LogContext logContext = new LogContext();
+
+    @AfterEach
+    public void tearDown() {
+        BuiltInPartitioner.mockRandom = null;
+    }
+
+    @Test
+    public void testStickyPartitioning() {
+        List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
+            new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES),
+            new PartitionInfo(TOPIC_A, 2, NODES[2], NODES, NODES),
+            new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES)
+        );
+        Cluster testCluster = new Cluster("clusterId", asList(NODES), allPartitions,
+            Collections.emptySet(), Collections.emptySet());
+
+        // Create partitions with "sticky" batch size to accommodate 3 records.
+        BuiltInPartitioner builtInPartitionerA = new BuiltInPartitioner(logContext, TOPIC_A, 3);
+
+        // Test the partition is not switched until sticky batch size is reached.
+        // Mock random number generator with just sequential integer.
+        AtomicInteger mockRandom = new AtomicInteger();
+        BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
+
+        BuiltInPartitioner.StickyPartitionInfo partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster);
+        int partA = partitionInfo.partition();
+        builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster);
+
+        partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster);
+        assertEquals(partA, partitionInfo.partition());
+        builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster);
+
+        partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster);
+        assertEquals(partA, partitionInfo.partition());
+        builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster);
+
+        // After producing 3 records, partition must've switched.
+        assertNotEquals(partA, builtInPartitionerA.peekCurrentPartitionInfo(testCluster).partition());
+
+        // Check that switching works even when there is one partition.
+        BuiltInPartitioner builtInPartitionerB = new BuiltInPartitioner(logContext, TOPIC_B, 1);
+        for (int c = 10; c-- > 0; ) {
+            partitionInfo = builtInPartitionerB.peekCurrentPartitionInfo(testCluster);
+            assertEquals(0, partitionInfo.partition());
+            builtInPartitionerB.updatePartitionInfo(partitionInfo, 1, testCluster);
+        }
+    }
+
+    @Test
+    public void unavailablePartitionsTest() {
+        // Partition 1 in topic A, partition 0 in topic B and partition 0 in topic C are unavailable partitions.
+        List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
+            new PartitionInfo(TOPIC_A, 1, null, NODES, NODES),
+            new PartitionInfo(TOPIC_A, 2, NODES[2], NODES, NODES),
+            new PartitionInfo(TOPIC_B, 0, null, NODES, NODES),
+            new PartitionInfo(TOPIC_B, 1, NODES[0], NODES, NODES),
+            new PartitionInfo(TOPIC_C, 0, null, NODES, NODES)
+        );
+
+        Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
+            Collections.emptySet(), Collections.emptySet());
+
+        // Create partitions with "sticky" batch size to accommodate 1 record.
+        BuiltInPartitioner builtInPartitionerA = new BuiltInPartitioner(logContext, TOPIC_A, 1);
+
+        // Assure we never choose partition 1 because it is unavailable.
+        BuiltInPartitioner.StickyPartitionInfo partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster);
+        int partA = partitionInfo.partition();
+        builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster);
+
+        boolean foundAnotherPartA = false;
+        assertNotEquals(1, partA);
+        for (int aPartitions = 0; aPartitions < 100; aPartitions++) {
+            partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster);
+            int anotherPartA = partitionInfo.partition();
+            builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster);
+
+            assertNotEquals(1, anotherPartA);
+            foundAnotherPartA = foundAnotherPartA || anotherPartA != partA;
+        }
+        assertTrue(foundAnotherPartA, "Expected to find partition other than " + partA);
+
+        BuiltInPartitioner builtInPartitionerB = new BuiltInPartitioner(logContext, TOPIC_B, 1);
+        // Assure we always choose partition 1 for topic B.
+        partitionInfo = builtInPartitionerB.peekCurrentPartitionInfo(testCluster);
+        int partB = partitionInfo.partition();
+        builtInPartitionerB.updatePartitionInfo(partitionInfo, 1, testCluster);
+
+        assertEquals(1, partB);
+        for (int bPartitions = 0; bPartitions < 100; bPartitions++) {
+            partitionInfo = builtInPartitionerB.peekCurrentPartitionInfo(testCluster);
+            assertEquals(1, partitionInfo.partition());
+            builtInPartitionerB.updatePartitionInfo(partitionInfo, 1, testCluster);
+        }
+
+        // Assure that we still choose the partition when there are no partitions available.
+        BuiltInPartitioner builtInPartitionerC = new BuiltInPartitioner(logContext, TOPIC_C, 1);
+        partitionInfo = builtInPartitionerC.peekCurrentPartitionInfo(testCluster);
+        int partC = partitionInfo.partition();
+        builtInPartitionerC.updatePartitionInfo(partitionInfo, 1, testCluster);
+        assertEquals(0, partC);
+
+        partitionInfo = builtInPartitionerC.peekCurrentPartitionInfo(testCluster);
+        partC = partitionInfo.partition();
+        assertEquals(0, partC);
+    }
+
+    @Test
+    public void adaptivePartitionsTest() {
+        // Mock random number generator with just sequential integer.
+        AtomicInteger mockRandom = new AtomicInteger();
+        BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
+
+        BuiltInPartitioner builtInPartitioner = new BuiltInPartitioner(logContext, TOPIC_A, 1);
+
+        // Simulate partition queue sizes.
+        int[] queueSizes = {5, 0, 3, 0, 1};
+        int[] partitionIds = new int[queueSizes.length];
+        int[] expectedFrequencies = new int[queueSizes.length];
+        List<PartitionInfo> allPartitions = new ArrayList<>();
+        for (int i = 0; i < partitionIds.length; i++) {
+            partitionIds[i] = i;
+            allPartitions.add(new PartitionInfo(TOPIC_A, i, NODES[i % NODES.length], NODES, NODES));
+            expectedFrequencies[i] = 6 - queueSizes[i];  // 6 is max(queueSizes) + 1
+        }
+
+        builtInPartitioner.updatePartitionLoadStats(queueSizes, partitionIds, queueSizes.length);
+
+        Cluster testCluster = new Cluster("clusterId", asList(NODES), allPartitions,
+            Collections.emptySet(), Collections.emptySet());
+
+        // Issue a certain number of partition calls to validate that the partitions would be
+        // distributed with frequencies that are reciprocal to the queue sizes.  The number of
+        // iterations is defined by the last element of the cumulative frequency table which is
+        // the sum of all frequencies.  We do 2 cycles, just so it's more than 1.
+        final int numberOfCycles = 2;
+        int numberOfIterations = builtInPartitioner.loadStatsRangeEnd() * numberOfCycles;
+        int[] frequencies = new int[queueSizes.length];
+
+        for (int i = 0; i < numberOfIterations; i++) {
+            BuiltInPartitioner.StickyPartitionInfo partitionInfo = builtInPartitioner.peekCurrentPartitionInfo(testCluster);
+            ++frequencies[partitionInfo.partition()];
+            builtInPartitioner.updatePartitionInfo(partitionInfo, 1, testCluster);
+        }
+
+        // Verify that frequencies are reciprocal of queue sizes.
+        for (int i = 0; i < frequencies.length; i++) {
+            assertEquals(expectedFrequencies[i] * numberOfCycles, frequencies[i],
+                "Partition " + i + " was chosen " + frequencies[i] + " times");
+        }
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index a55e5d2220..e250748643 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -42,6 +42,7 @@ public class DefaultPartitionerTest {
 
     @Test
     public void testKeyPartitionIsStable() {
+        @SuppressWarnings("deprecation")
         final Partitioner partitioner = new DefaultPartitioner();
         final Cluster cluster = new Cluster("clusterId", asList(NODES), PARTITIONS,
             Collections.<String>emptySet(), Collections.<String>emptySet());
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index d5b89ea864..cf991de338 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -114,18 +114,18 @@ public class RecordAccumulatorTest {
                 Collections.emptySet(), Collections.emptySet());
 
         //  initial data
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
 
         // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained
         Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
         verifyTopicPartitionInBatches(batches1, tp1, tp3);
 
         // add record for tp1, tp3
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
 
         // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained
         // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4
@@ -137,18 +137,18 @@ public class RecordAccumulatorTest {
         verifyTopicPartitionInBatches(batches3, tp1, tp3);
 
         // add record for tp2, tp3, tp4 and mute the tp4
-        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         accum.mutePartition(tp4);
         // drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted)
         Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
         verifyTopicPartitionInBatches(batches4, tp2, tp3);
 
         // add record for tp1, tp2, tp3, and unmute tp4
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         accum.unmutePartition(tp4);
         // set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4]
         Map<Integer, List<ProducerBatch>> batches5 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
@@ -182,8 +182,8 @@ public class RecordAccumulatorTest {
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             // append to the first batch
-            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-            Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
+            accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+            Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1);
             assertEquals(1, partitionBatches.size());
 
             ProducerBatch batch = partitionBatches.peekFirst();
@@ -193,8 +193,8 @@ public class RecordAccumulatorTest {
 
         // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
 
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-        Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+        Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1);
         assertEquals(2, partitionBatches.size());
         Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator();
         assertTrue(partitionBatchesIterator.next().isWritable());
@@ -228,10 +228,10 @@ public class RecordAccumulatorTest {
         byte[] value = new byte[2 * batchSize];
         RecordAccumulator accum = createTestRecordAccumulator(
                 batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         assertEquals(Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes, "Our partition's leader should be ready");
 
-        Deque<ProducerBatch> batches = accum.batches().get(tp1);
+        Deque<ProducerBatch> batches = accum.getDeque(tp1);
         assertEquals(1, batches.size());
         ProducerBatch producerBatch = batches.peek();
         List<MutableRecordBatch> recordBatches = TestUtils.toList(producerBatch.records().batches());
@@ -266,10 +266,10 @@ public class RecordAccumulatorTest {
 
         RecordAccumulator accum = createTestRecordAccumulator(
                 batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         assertEquals(Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes, "Our partition's leader should be ready");
 
-        Deque<ProducerBatch> batches = accum.batches().get(tp1);
+        Deque<ProducerBatch> batches = accum.getDeque(tp1);
         assertEquals(1, batches.size());
         ProducerBatch producerBatch = batches.peek();
         List<MutableRecordBatch> recordBatches = TestUtils.toList(producerBatch.records().batches());
@@ -290,7 +290,7 @@ public class RecordAccumulatorTest {
         int lingerMs = 10;
         RecordAccumulator accum = createTestRecordAccumulator(
                 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs);
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         assertEquals(0, accum.ready(cluster, time.milliseconds()).readyNodes.size(), "No partitions should be ready");
         time.sleep(10);
         assertEquals(Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes, "Our partition's leader should be ready");
@@ -313,7 +313,7 @@ public class RecordAccumulatorTest {
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
-                accum.append(tp, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+                accum.append(tp.topic(), tp.partition(), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         }
         assertEquals(Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes, "Partition's leader should be ready");
 
@@ -335,7 +335,7 @@ public class RecordAccumulatorTest {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+                            accum.append(topic, i % numParts, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -379,7 +379,7 @@ public class RecordAccumulatorTest {
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals(0, result.readyNodes.size(), "No nodes should be ready.");
         assertEquals(lingerMs, result.nextReadyCheckDelayMs, "Next check time should be the linger time");
@@ -388,14 +388,14 @@ public class RecordAccumulatorTest {
 
         // Add partition on node2 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals(0, result.readyNodes.size(), "No nodes should be ready.");
         assertEquals(lingerMs / 2, result.nextReadyCheckDelayMs, "Next check time should be defined by node1, half remaining linger time");
 
         // Add data for another partition on node1, enough to make data sendable immediately
         for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready");
         // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
@@ -417,7 +417,7 @@ public class RecordAccumulatorTest {
             new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
 
         long now = time.milliseconds();
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready");
         Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
@@ -429,7 +429,7 @@ public class RecordAccumulatorTest {
         accum.reenqueue(batches.get(0).get(0), now);
 
         // Put message for partition 1 into accumulator
-        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready");
 
@@ -455,7 +455,7 @@ public class RecordAccumulatorTest {
                 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
 
         for (int i = 0; i < 100; i++) {
-            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, i % 3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
             assertTrue(accum.hasIncomplete());
         }
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@@ -493,7 +493,7 @@ public class RecordAccumulatorTest {
     public void testAwaitFlushComplete() throws Exception {
         RecordAccumulator accum = createTestRecordAccumulator(
             4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Integer.MAX_VALUE);
-        accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, 0, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
 
         accum.beginFlush();
         assertTrue(accum.flushInProgress());
@@ -514,15 +514,19 @@ public class RecordAccumulatorTest {
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
         final RecordAccumulator accum = createTestRecordAccumulator(
             128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
-        class TestCallback implements Callback {
+        class TestCallback implements RecordAccumulator.AppendCallbacks {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
                 assertTrue(exception.getMessage().equals("Producer is closed forcefully."));
                 numExceptionReceivedInCallback.incrementAndGet();
             }
+
+            @Override
+            public void setPartition(int partition) {
+            }
         }
         for (int i = 0; i < numRecords; i++)
-            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertFalse(result.readyNodes.isEmpty());
         Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
@@ -555,15 +559,19 @@ public class RecordAccumulatorTest {
                 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
         final KafkaException cause = new KafkaException();
 
-        class TestCallback implements Callback {
+        class TestCallback implements RecordAccumulator.AppendCallbacks {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
                 assertEquals(cause, exception);
                 numExceptionReceivedInCallback.incrementAndGet();
             }
+
+            @Override
+            public void setPartition(int partition) {
+            }
         }
         for (int i = 0; i < numRecords; i++)
-            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertFalse(result.readyNodes.isEmpty());
         Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
@@ -602,7 +610,7 @@ public class RecordAccumulatorTest {
         for (Boolean mute: muteStates) {
             if (time.milliseconds() < System.currentTimeMillis())
                 time.setCurrentTimeMs(System.currentTimeMillis());
-            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
             assertEquals(0, accum.ready(cluster, time.milliseconds()).readyNodes.size(), "No partition should be ready.");
 
             time.sleep(lingerMs);
@@ -651,11 +659,11 @@ public class RecordAccumulatorTest {
 
         // Test batches not in retry
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
             assertEquals(0, accum.ready(cluster, time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
         }
         // Make the batches ready due to batch full
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster);
         Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
         // Advance the clock to expire the batch.
@@ -685,7 +693,7 @@ public class RecordAccumulatorTest {
 
         // Test batches in retry.
         // Create a retried batch
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster);
         time.sleep(lingerMs);
         readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
@@ -709,7 +717,7 @@ public class RecordAccumulatorTest {
         assertEquals(0, expiredBatches.size(), "All batches should have been expired.");
 
         // Test that when being throttled muted batches are expired before the throttle time is over.
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster);
         time.sleep(lingerMs);
         readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
@@ -742,7 +750,7 @@ public class RecordAccumulatorTest {
                 batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, 10);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+            accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
             assertEquals(0, accum.ready(cluster, now).readyNodes.size(), "No partitions should be ready.");
         }
         time.sleep(2000);
@@ -785,7 +793,7 @@ public class RecordAccumulatorTest {
             CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
             new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
         assertThrows(UnsupportedVersionException.class,
-            () -> accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()));
+            () -> accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster));
     }
 
     @Test
@@ -808,10 +816,10 @@ public class RecordAccumulatorTest {
         // Initially, the transaction is still in progress, so we should respect the linger.
         Mockito.when(transactionManager.isCompleting()).thenReturn(false);
 
-        accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
-            false, time.milliseconds());
-        accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
-            false, time.milliseconds());
+        accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
+            false, time.milliseconds(), cluster);
+        accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
+            false, time.milliseconds(), cluster);
         assertTrue(accumulator.hasUndrained());
 
         RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(cluster, time.milliseconds());
@@ -930,7 +938,7 @@ public class RecordAccumulatorTest {
                 int dice = random.nextInt(100);
                 byte[] value = (dice < goodCompRatioPercentage) ?
                         bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100);
-                accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
+                accum.append(topic, partition1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster);
                 BatchDrainedResult result = completeOrSplitBatches(accum, batchSize);
                 numSplit += result.numSplit;
                 numBatches += result.numBatches;
@@ -953,7 +961,7 @@ public class RecordAccumulatorTest {
         RecordAccumulator accum = createTestRecordAccumulator(
             batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs);
 
-        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
         assertTrue(drained.isEmpty());
@@ -968,7 +976,7 @@ public class RecordAccumulatorTest {
         //assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
 
         // Queue another batch and advance clock such that batch expiry time is earlier than request timeout.
-        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         time.sleep(lingerMs * 4);
 
         // Now drain and check that accumulator picked up the drained batch because its expiry is soon.
@@ -993,7 +1001,7 @@ public class RecordAccumulatorTest {
 
         // Test batches in retry.
         for (Boolean mute : muteStates) {
-            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
+            accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster);
             time.sleep(lingerMs);
             readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
             assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
@@ -1015,6 +1023,7 @@ public class RecordAccumulatorTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testStickyBatches() throws Exception {
         long now = time.milliseconds();
@@ -1024,24 +1033,23 @@ public class RecordAccumulatorTest {
 
         Partitioner partitioner = new DefaultPartitioner();
         RecordAccumulator accum = createTestRecordAccumulator(3200,
-                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10);
+            batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10);
         int expectedAppends = expectedNumAppendsNoKey(batchSize);
 
         // Create first batch
         int partition = partitioner.partition(topic, null, null, "value", value, cluster);
-        TopicPartition tp = new TopicPartition(topic, partition);
-        accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         int appends = 1;
 
         boolean switchPartition = false;
         while (!switchPartition) {
             // Append to the first batch
             partition = partitioner.partition(topic, null, null, "value", value, cluster);
-            tp = new TopicPartition(topic, partition);
-            RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds());
-            Deque<ProducerBatch> partitionBatches1 = accum.batches().get(tp1);
-            Deque<ProducerBatch> partitionBatches2 = accum.batches().get(tp2);
-            Deque<ProducerBatch> partitionBatches3 = accum.batches().get(tp3);
+            RecordAccumulator.RecordAppendResult result = accum.append(topic, partition, 0L, null,
+                value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster);
+            Deque<ProducerBatch> partitionBatches1 = accum.getDeque(tp1);
+            Deque<ProducerBatch> partitionBatches2 = accum.getDeque(tp2);
+            Deque<ProducerBatch> partitionBatches3 = accum.getDeque(tp3);
             int numBatches = (partitionBatches1 == null ? 0 : partitionBatches1.size()) + (partitionBatches2 == null ? 0 : partitionBatches2.size()) + (partitionBatches3 == null ? 0 : partitionBatches3.size());
             // Only one batch is created because the partition is sticky.
             assertEquals(1, numBatches);
@@ -1062,18 +1070,17 @@ public class RecordAccumulatorTest {
         // KafkaProducer would call this method in this case, make second batch
         partitioner.onNewBatch(topic, cluster, partition);
         partition = partitioner.partition(topic, null, null, "value", value, cluster);
-        tp = new TopicPartition(topic, partition);
-        accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(topic, partition, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         appends++;
 
         // These appends all go into the second batch
         while (!switchPartition) {
             partition = partitioner.partition(topic, null, null, "value", value, cluster);
-            tp = new TopicPartition(topic, partition);
-            RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds());
-            Deque<ProducerBatch> partitionBatches1 = accum.batches().get(tp1);
-            Deque<ProducerBatch> partitionBatches2 = accum.batches().get(tp2);
-            Deque<ProducerBatch> partitionBatches3 = accum.batches().get(tp3);
+            RecordAccumulator.RecordAppendResult result = accum.append(topic, partition, 0L, null, value,
+                Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster);
+            Deque<ProducerBatch> partitionBatches1 = accum.getDeque(tp1);
+            Deque<ProducerBatch> partitionBatches2 = accum.getDeque(tp2);
+            Deque<ProducerBatch> partitionBatches3 = accum.getDeque(tp3);
             int numBatches = (partitionBatches1 == null ? 0 : partitionBatches1.size()) + (partitionBatches2 == null ? 0 : partitionBatches2.size()) + (partitionBatches3 == null ? 0 : partitionBatches3.size());
             // Only two batches because the new partition is also sticky.
             assertEquals(2, numBatches);
@@ -1089,6 +1096,158 @@ public class RecordAccumulatorTest {
         assertEquals(appends, 2 * expectedAppends);
     }
 
+    @Test
+    public void testUniformBuiltInPartitioner() throws Exception {
+
+        try {
+            // Mock random number generator with just sequential integer.
+            AtomicInteger mockRandom = new AtomicInteger();
+            BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
+
+            long totalSize = 1024 * 1024;
+            int batchSize = 128;  // note that this is also a "sticky" limit for the partitioner
+            RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, CompressionType.NONE, 0);
+
+            // Set up callbacks so that we know what partition is chosen.
+            final AtomicInteger partition = new AtomicInteger(RecordMetadata.UNKNOWN_PARTITION);
+            RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks() {
+                @Override
+                public void setPartition(int p) {
+                    partition.set(p);
+                }
+
+                @Override
+                public void onCompletion(RecordMetadata metadata, Exception exception) {
+
+                }
+            };
+
+            // Produce small record, we should switch to first partition.
+            accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, value, Record.EMPTY_HEADERS,
+                callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+            assertEquals(partition1, partition.get());
+            assertEquals(1, mockRandom.get());
+
+            // Produce large record, we should exceed "sticky" limit, but produce to this partition
+            // as we switch after the "sticky" limit is exceeded.  The partition is switched after
+            // we produce.
+            byte[] largeValue = new byte[batchSize];
+            accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+            assertEquals(partition1, partition.get());
+            assertEquals(2, mockRandom.get());
+
+            // Produce large record, we should switch to next partition.
+            accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+            assertEquals(partition2, partition.get());
+            assertEquals(3, mockRandom.get());
+
+            // Produce large record, we should switch to next partition.
+            accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+            assertEquals(partition3, partition.get());
+            assertEquals(4, mockRandom.get());
+
+            // Produce large record, we should switch to first partition again.
+            accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+            assertEquals(partition1, partition.get());
+            assertEquals(5, mockRandom.get());
+        } finally {
+            BuiltInPartitioner.mockRandom = null;
+        }
+    }
+
+    @Test
+    public void testAdaptiveBuiltInPartitioner() throws Exception {
+        try {
+            // Mock random number generator with just sequential integer.
+            AtomicInteger mockRandom = new AtomicInteger();
+            BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
+
+            // Create accumulator with partitioner config to enable adaptive partitioning.
+            RecordAccumulator.PartitionerConfig config = new RecordAccumulator.PartitionerConfig(true, 100);
+            long totalSize = 1024 * 1024;
+            int batchSize = 128;
+            RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
+                3200, config, metrics, "producer-metrics", time, new ApiVersions(), null,
+                new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
+
+            byte[] largeValue = new byte[batchSize];
+            int[] queueSizes = {1, 7, 2};
+            int[] expectedFrequencies = new int[queueSizes.length];
+            for (int i = 0; i < queueSizes.length; i++) {
+                expectedFrequencies[i] = 8 - queueSizes[i];  // 8 is max(queueSizes) + 1
+                for (int c = queueSizes[i]; c-- > 0; ) {
+                    // Add large records to each partition, so that each record creates a batch.
+                    accum.append(topic, i, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                        null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+                }
+                assertEquals(queueSizes[i], accum.getDeque(new TopicPartition(topic, i)).size());
+            }
+
+            // Let the accumulator generate the probability tables.
+            accum.ready(cluster, time.milliseconds());
+
+            // Set up callbacks so that we know what partition is chosen.
+            final AtomicInteger partition = new AtomicInteger(RecordMetadata.UNKNOWN_PARTITION);
+            RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks() {
+                @Override
+                public void setPartition(int p) {
+                    partition.set(p);
+                }
+
+                @Override
+                public void onCompletion(RecordMetadata metadata, Exception exception) {
+
+                }
+            };
+
+            // Prime built-in partitioner so that it'd switch on every record, as switching only
+            // happens after the "sticky" limit is exceeded.
+            accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+
+            // Issue a certain number of partition calls to validate that the partitions would be
+            // distributed with frequencies that are reciprocal to the queue sizes.  The number of
+            // iterations is defined by the last element of the cumulative frequency table which is
+            // the sum of all frequencies.  We do 2 cycles, just so it's more than 1.
+            final int numberOfCycles = 2;
+            int numberOfIterations = accum.getBuiltInPartitioner(topic).loadStatsRangeEnd() * numberOfCycles;
+            int[] frequencies = new int[queueSizes.length];
+
+            for (int i = 0; i < numberOfIterations; i++) {
+                accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                    callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+                ++frequencies[partition.get()];
+            }
+
+            // Verify that frequencies are reciprocal of queue sizes.
+            for (int i = 0; i < frequencies.length; i++) {
+                assertEquals(expectedFrequencies[i] * numberOfCycles, frequencies[i],
+                    "Partition " + i + " was chosen " + frequencies[i] + " times");
+            }
+
+            // Test that partitions residing on high-latency nodes don't get switched to.
+            accum.updateNodeLatencyStats(0, time.milliseconds() - 200, true);
+            accum.updateNodeLatencyStats(0, time.milliseconds(), false);
+            accum.ready(cluster, time.milliseconds());
+
+            // Do one append, because partition gets switched after append.
+            accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                    callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+
+            for (int c = 10; c-- > 0; ) {
+                accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
+                    callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
+                assertEquals(partition3, partition.get());
+            }
+        } finally {
+            BuiltInPartitioner.mockRandom = null;
+        }
+    }
+
     private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords)
         throws InterruptedException {
         Random random = new Random();
@@ -1098,7 +1257,7 @@ public class RecordAccumulatorTest {
         CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
         // Append 20 records of 100 bytes size with poor compression ratio should make the batch too big.
         for (int i = 0; i < numRecords; i++) {
-            accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
+            accum.append(topic, partition1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster);
         }
 
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 60e9f06186..3d972b3eb2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.InvalidRecordException;
@@ -467,22 +466,30 @@ public class SenderTest {
         final byte[] key = "key".getBytes();
         final byte[] value = "value".getBytes();
         final long maxBlockTimeMs = 1000;
-        Callback callback = (metadata, exception) -> {
-            if (exception instanceof TimeoutException) {
-                expiryCallbackCount.incrementAndGet();
-                try {
-                    accumulator.append(tp1, 0L, key, value,
-                        Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
-                } catch (InterruptedException e) {
-                    throw new RuntimeException("Unexpected interruption", e);
-                }
-            } else if (exception != null)
-                unexpectedException.compareAndSet(null, exception);
+        Cluster cluster = TestUtils.singletonCluster();
+        RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks() {
+            @Override
+            public void setPartition(int partition) {
+            }
+
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                if (exception instanceof TimeoutException) {
+                    expiryCallbackCount.incrementAndGet();
+                    try {
+                        accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value,
+                            Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Unexpected interruption", e);
+                    }
+                } else if (exception != null)
+                    unexpectedException.compareAndSet(null, exception);
+            }
         };
 
         final long nowMs = time.milliseconds();
         for (int i = 0; i < messagesPerBatch; i++)
-            accumulator.append(tp1, 0L, key, value, null, callback, maxBlockTimeMs, false, nowMs);
+            accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, null, callbacks, maxBlockTimeMs, false, nowMs, cluster);
 
         // Advance the clock to expire the first batch.
         time.sleep(10000);
@@ -501,9 +508,9 @@ public class SenderTest {
         assertEquals(messagesPerBatch, expiryCallbackCount.get(), "Callbacks not invoked for expiry");
         assertNull(unexpectedException.get(), "Unexpected exception");
         // Make sure that the reconds were appended back to the batch.
-        assertTrue(accumulator.batches().containsKey(tp1));
-        assertEquals(1, accumulator.batches().get(tp1).size());
-        assertEquals(messagesPerBatch, accumulator.batches().get(tp1).peekFirst().recordCount);
+        assertNotNull(accumulator.getDeque(tp1));
+        assertEquals(1, accumulator.getDeque(tp1).size());
+        assertEquals(messagesPerBatch, accumulator.getDeque(tp1).peekFirst().recordCount);
     }
 
     /**
@@ -546,6 +553,76 @@ public class SenderTest {
         assertTrue(future.isDone(), "Request should be completed");
     }
 
+    @Test
+    public void testNodeLatencyStats() throws Exception {
+        try (Metrics m = new Metrics()) {
+            // Create a new record accumulator with non-0 partitionAvailabilityTimeoutMs
+            // otherwise it wouldn't update the stats.
+            RecordAccumulator.PartitionerConfig config = new RecordAccumulator.PartitionerConfig(false, 42);
+            long totalSize = 1024 * 1024;
+            accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
+                DELIVERY_TIMEOUT_MS, config, m, "producer-metrics", time, apiVersions, null,
+                new BufferPool(totalSize, batchSize, m, time, "producer-internal-metrics"));
+
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, 1,
+                senderMetrics, time, REQUEST_TIMEOUT, 1000L, null, new ApiVersions());
+
+            // Produce and send batch.
+            long time1 = time.milliseconds();
+            appendToAccumulator(tp0, 0L, "key", "value");
+            sender.runOnce();
+            assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight.");
+
+            // We were able to send the batch out, so both the ready and drain values should be the same.
+            RecordAccumulator.NodeLatencyStats stats = accumulator.getNodeLatencyStats(0);
+            assertEquals(time1, stats.drainTimeMs);
+            assertEquals(time1, stats.readyTimeMs);
+
+            // Make the node 1 not ready.
+            client.throttle(metadata.fetch().nodeById(0), 100);
+
+            // Time passes, but we don't have anything to send.
+            time.sleep(10);
+            sender.runOnce();
+            assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight.");
+
+            // Stats shouldn't change as we didn't have anything ready.
+            assertEquals(time1, stats.drainTimeMs);
+            assertEquals(time1, stats.readyTimeMs);
+
+            // Produce a new batch, but we won't be able to send it because node is not ready.
+            long time2 = time.milliseconds();
+            appendToAccumulator(tp0, 0L, "key", "value");
+            sender.runOnce();
+            assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight.");
+
+            // The ready time should move forward, but drain time shouldn't change.
+            assertEquals(time1, stats.drainTimeMs);
+            assertEquals(time2, stats.readyTimeMs);
+
+            // Time passes, we keep trying to send, but the node is not ready.
+            time.sleep(10);
+            time2 = time.milliseconds();
+            sender.runOnce();
+            assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight.");
+
+            // The ready time should move forward, but drain time shouldn't change.
+            assertEquals(time1, stats.drainTimeMs);
+            assertEquals(time2, stats.readyTimeMs);
+
+            // Finally, time passes beyond the throttle and the node is ready.
+            time.sleep(100);
+            time2 = time.milliseconds();
+            sender.runOnce();
+            assertEquals(2, client.inFlightRequestCount(), "We should have 2 produce requests in flight.");
+
+            // Both times should move forward
+            assertEquals(time2, stats.drainTimeMs);
+            assertEquals(time2, stats.readyTimeMs);
+        }
+    }
+
     @Test
     public void testInitProducerIdRequest() {
         final long producerId = 343434L;
@@ -1200,7 +1277,7 @@ public class SenderTest {
         client.respondToRequest(secondClientRequest, produceResponse(tp0, -1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1));
 
         sender.runOnce(); // receive response 1
-        Deque<ProducerBatch> queuedBatches = accumulator.batches().get(tp0);
+        Deque<ProducerBatch> queuedBatches = accumulator.getDeque(tp0);
 
         // Make sure that we are queueing the second batch first.
         assertEquals(1, queuedBatches.size());
@@ -1281,7 +1358,7 @@ public class SenderTest {
         assertTrue(request2.isDone());
         assertEquals(1, request2.get().offset());
         assertFalse(request1.isDone());
-        Deque<ProducerBatch> queuedBatches = accumulator.batches().get(tp0);
+        Deque<ProducerBatch> queuedBatches = accumulator.getDeque(tp0);
 
         assertEquals(0, queuedBatches.size());
         assertEquals(1, client.inFlightRequestCount());
@@ -1389,7 +1466,7 @@ public class SenderTest {
         assertEquals(1, request2.get().offset());
         assertEquals(0, sender.inFlightBatches(tp0).size());
 
-        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+        Deque<ProducerBatch> batches = accumulator.getDeque(tp0);
         assertEquals(1, batches.size());
         assertFalse(batches.peekFirst().hasSequence());
         assertFalse(client.hasInFlightRequests());
@@ -1444,7 +1521,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
         sender.runOnce(); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
 
-        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+        Deque<ProducerBatch> batches = accumulator.getDeque(tp0);
 
         // The epoch should be bumped and the second request should be requeued
         assertEquals(2, batches.size());
@@ -1524,7 +1601,7 @@ public class SenderTest {
         assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
         assertFalse(client.hasInFlightRequests());
-        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+        Deque<ProducerBatch> batches = accumulator.getDeque(tp0);
         assertEquals(0, batches.size());
         assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
 
@@ -2337,10 +2414,11 @@ public class SenderTest {
             client.prepareMetadataUpdate(metadataUpdate1);
             // Send the first message.
             long nowMs = time.milliseconds();
+            Cluster cluster = TestUtils.singletonCluster();
             Future<RecordMetadata> f1 =
-                    accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs).future;
+                    accumulator.append(tp.topic(), tp.partition(), 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs, cluster).future;
             Future<RecordMetadata> f2 =
-                    accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs).future;
+                    accumulator.append(tp.topic(), tp.partition(), 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs, cluster).future;
             sender.runOnce(); // connect
             sender.runOnce(); // send produce request
 
@@ -2395,7 +2473,7 @@ public class SenderTest {
             assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence number should be 2");
             assertEquals(OptionalInt.of(1), txnManager.lastAckedSequence(tp), "The last ack'd sequence number should be 1");
             assertEquals(1L, f2.get().offset(), "Offset of the first message should be 1");
-            assertTrue(accumulator.batches().get(tp).isEmpty(), "There should be no batch in the accumulator");
+            assertTrue(accumulator.getDeque(tp).isEmpty(), "There should be no batch in the accumulator");
             assertTrue((Double) (m.metrics().get(senderMetrics.batchSplitRate).metricValue()) > 0, "There should be a split");
         }
     }
@@ -3063,8 +3141,8 @@ public class SenderTest {
     }
 
     private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException {
-        return accumulator.append(tp, timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS,
-                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        return accumulator.append(tp.topic(), tp.partition(), timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS,
+                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), TestUtils.singletonCluster()).future;
     }
 
     @SuppressWarnings("deprecation")
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 64be3aeaf4..377db5ec06 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -693,8 +693,9 @@ public class TransactionManagerTest {
 
         assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
 
-        Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS,
-                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        Future<RecordMetadata> responseFuture1 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),
+                "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(),
+                TestUtils.singletonCluster()).future;
         sender.runOnce();
         assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
 
@@ -723,8 +724,9 @@ public class TransactionManagerTest {
         assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch);
         assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
 
-        Future<RecordMetadata> responseFuture2 = accumulator.append(tp0, time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS,
-                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        Future<RecordMetadata> responseFuture2 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),
+                "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(),
+                TestUtils.singletonCluster()).future;
         sender.runOnce();
         sender.runOnce();
         assertEquals(0, transactionManager.firstInFlightSequence(tp0));
@@ -3178,7 +3180,7 @@ public class TransactionManagerTest {
         // New tp1 batches should not be drained from the accumulator while tp1 has in-flight requests using the old epoch
         appendToAccumulator(tp1);
         sender.runOnce();
-        assertEquals(1, accumulator.batches().get(tp1).size());
+        assertEquals(1, accumulator.getDeque(tp1).size());
 
         // Partition failover occurs and tp1 returns a NOT_LEADER_OR_FOLLOWER error
         // Despite having the old epoch, the batch should retry
@@ -3189,8 +3191,8 @@ public class TransactionManagerTest {
 
         // The batch with the old epoch should be successfully drained, leaving the new one in the queue
         sender.runOnce();
-        assertEquals(1, accumulator.batches().get(tp1).size());
-        assertNotEquals(tp1b2, accumulator.batches().get(tp1).peek());
+        assertEquals(1, accumulator.getDeque(tp1).size());
+        assertNotEquals(tp1b2, accumulator.getDeque(tp1).peek());
         assertEquals(epoch, tp1b2.producerEpoch());
 
         // After successfully retrying, there should be no in-flight batches for tp1 and the sequence should be 0
@@ -3205,7 +3207,7 @@ public class TransactionManagerTest {
 
         // The last batch should now be drained and sent
         runUntil(() -> transactionManager.hasInflightBatches(tp1));
-        assertTrue(accumulator.batches().get(tp1).isEmpty());
+        assertTrue(accumulator.getDeque(tp1).isEmpty());
         ProducerBatch tp1b3 = transactionManager.nextBatchBySequence(tp1);
         assertEquals(epoch + 1, tp1b3.producerEpoch());
 
@@ -3302,7 +3304,7 @@ public class TransactionManagerTest {
         // New tp1 batches should not be drained from the accumulator while tp1 has in-flight requests using the old epoch
         appendToAccumulator(tp1);
         sender.runOnce();
-        assertEquals(1, accumulator.batches().get(tp1).size());
+        assertEquals(1, accumulator.getDeque(tp1).size());
 
         // Partition failover occurs and tp1 returns a NOT_LEADER_OR_FOLLOWER error
         // Despite having the old epoch, the batch should retry
@@ -3313,8 +3315,8 @@ public class TransactionManagerTest {
 
         // The batch with the old epoch should be successfully drained, leaving the new one in the queue
         sender.runOnce();
-        assertEquals(1, accumulator.batches().get(tp1).size());
-        assertNotEquals(tp1b2, accumulator.batches().get(tp1).peek());
+        assertEquals(1, accumulator.getDeque(tp1).size());
+        assertNotEquals(tp1b2, accumulator.getDeque(tp1).peek());
         assertEquals(epoch, tp1b2.producerEpoch());
 
         // After successfully retrying, there should be no in-flight batches for tp1 and the sequence should be 0
@@ -3329,7 +3331,7 @@ public class TransactionManagerTest {
 
         // The last batch should now be drained and sent
         runUntil(() -> transactionManager.hasInflightBatches(tp1));
-        assertTrue(accumulator.batches().get(tp1).isEmpty());
+        assertTrue(accumulator.getDeque(tp1).isEmpty());
         ProducerBatch tp1b3 = transactionManager.nextBatchBySequence(tp1);
         assertEquals(epoch + 1, tp1b3.producerEpoch());
 
@@ -3344,8 +3346,8 @@ public class TransactionManagerTest {
 
     private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException {
         final long nowMs = time.milliseconds();
-        return accumulator.append(tp, nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS,
-                null, MAX_BLOCK_TIMEOUT, false, nowMs).future;
+        return accumulator.append(tp.topic(), tp.partition(), nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS,
+                null, MAX_BLOCK_TIMEOUT, false, nowMs, TestUtils.singletonCluster()).future;
     }
 
     private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTransactionResult,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index a7c1bb7ada..33f0892a6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
index b14c846925..7848c2db87 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java
index 40f66f036c..929b14ab29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
index a435cafb89..90ffa3a4a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -16,12 +16,11 @@
  */
 package org.apache.kafka.streams.processor;
 
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.streams.Topology;
 
 /**
  * Determine how records are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
- * {@link DefaultPartitioner} will be used to determine the partition.
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
  * <p>
  * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
  * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
index a90a028d72..f5c9c158bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -15,8 +15,6 @@
  * limitations under the License.
  */
 package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -25,12 +23,15 @@ public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
 
     private final Cluster cluster;
     private final Serializer<K> keySerializer;
-    private final DefaultPartitioner defaultPartitioner;
 
+    @SuppressWarnings("deprecation")
+    private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner;
+
+    @SuppressWarnings("deprecation")
     public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) {
         this.cluster = cluster;
         this.keySerializer = keySerializer;
-        this.defaultPartitioner = new DefaultPartitioner();
+        this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner();
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index adebf167de..a659525727 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -55,7 +54,8 @@ public class WindowedStreamPartitionerTest {
     @Test
     public void testCopartitioning() {
         final Random rand = new Random();
-        final DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
+        @SuppressWarnings("deprecation")
+        final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner();
         final WindowedSerializer<Integer> timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer);
         final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(timeWindowedSerializer);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 48364f27db..2da9b397e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -784,7 +783,7 @@ public class RecordCollectorTest {
                 new MockClientSupplier() {
                     @Override
                     public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-                        return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                        return new MockProducer<byte[], byte[]>(cluster, true, byteArraySerializer, byteArraySerializer) {
                             @Override
                             public void abortTransaction() {
                                 functionCalled.set(true);
@@ -816,7 +815,7 @@ public class RecordCollectorTest {
                 new MockClientSupplier() {
                     @Override
                     public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-                        return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                        return new MockProducer<byte[], byte[]>(cluster, true, byteArraySerializer, byteArraySerializer) {
                             @Override
                             public List<PartitionInfo> partitionsFor(final String topic) {
                                 return Collections.emptyList();
@@ -889,7 +888,7 @@ public class RecordCollectorTest {
             new MockClientSupplier() {
                 @Override
                 public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-                    return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                    return new MockProducer<byte[], byte[]>(cluster, true, byteArraySerializer, byteArraySerializer) {
                         @Override
                         public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
                             callback.onCompletion(null, exception);
@@ -912,7 +911,7 @@ public class RecordCollectorTest {
             new MockClientSupplier() {
                 @Override
                 public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-                    return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                    return new MockProducer<byte[], byte[]>(cluster, true, byteArraySerializer, byteArraySerializer) {
                         @Override
                         public synchronized List<PartitionInfo> partitionsFor(final String topic) {
                             throw exception;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 880f2cb3bf..53b80ae38b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.streams.KafkaClientSupplier;
@@ -69,7 +68,7 @@ public class MockClientSupplier implements KafkaClientSupplier {
         } else {
             assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
         }
-        final MockProducer<byte[], byte[]> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
+        final MockProducer<byte[], byte[]> producer = new MockProducer<>(cluster, true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
         producers.add(producer);
         return producer;
     }