You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/08/12 21:17:53 UTC

[kafka] branch trunk updated: MINOR: Clean up the sticky partitioner code a bit (#7151)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88087e9  MINOR: Clean up the sticky partitioner code a bit (#7151)
88087e9 is described below

commit 88087e91dd4eed1ee3c3e12961db84b7b56c34a0
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Mon Aug 12 14:17:28 2019 -0700

    MINOR: Clean up the sticky partitioner code a bit (#7151)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Lucas Bradstreet <lu...@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      |  2 +-
 .../producer/internals/StickyPartitionCache.java   |  4 +-
 .../producer/internals/TransactionManagerTest.java | 72 +++++++++++-----------
 docs/upgrade.html                                  |  4 ++
 4 files changed, 43 insertions(+), 39 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c586fb4..3b3589f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -925,7 +925,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 partition = partition(record, serializedKey, serializedValue, cluster);
                 tp = new TopicPartition(record.topic(), partition);
                 if (log.isTraceEnabled()) {
-                    log.trace("Retrying because of a new batch, sending the record to topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
+                    log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                 }
                 // producer callback will make sure to call both 'callback' and interceptor callback
                 interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/StickyPartitionCache.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/StickyPartitionCache.java
index 117eb5f..bfe1ac6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/StickyPartitionCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/StickyPartitionCache.java
@@ -51,14 +51,14 @@ public class StickyPartitionCache {
         // triggered the new batch matches the sticky partition that needs to be changed.
         if (oldPart == null || oldPart == prevPartition) {
             List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
-            Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
             if (availablePartitions.size() < 1) {
+                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                 newPart = random % partitions.size();
             } else if (availablePartitions.size() == 1) {
                 newPart = availablePartitions.get(0).partition();
             } else {
                 while (newPart == null || newPart.equals(oldPart)) {
-                    random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
+                    Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                     newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                 }
             }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index cca5771..24869aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -154,7 +154,7 @@ public class TransactionManagerTest {
         transactionManager.failIfNotReadyForSend();
         transactionManager.maybeAddPartitionToTransaction(tp0);
         FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxn(tp0, Errors.NONE);
         prepareProduceResponse(Errors.NONE, pid, epoch);
@@ -800,7 +800,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1251,7 +1251,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
 
         Future<RecordMetadata> responseFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
         sender.runOnce();
@@ -1275,7 +1275,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
         sender.runOnce();
@@ -1309,14 +1309,14 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxn(tp0, Errors.NONE);
 
         Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
-                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
         sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(tp0));
 
         transactionManager.failIfNotReadyForSend();
         transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
         Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
-                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
         prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
         sender.runOnce();
         assertTrue(transactionManager.hasAbortableError());
@@ -1342,7 +1342,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
         sender.runOnce();
@@ -1376,7 +1376,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxn(tp0, Errors.NONE);
 
         Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(tp0, time.milliseconds(),
-                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
         sender.runOnce();
         assertTrue(transactionManager.isPartitionAdded(tp0));
 
@@ -1389,7 +1389,7 @@ public class TransactionManagerTest {
         transactionManager.failIfNotReadyForSend();
         transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
         Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
-                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
         prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
         sender.runOnce();
         assertTrue(transactionManager.hasAbortableError());
@@ -1419,7 +1419,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
         sender.runOnce();
@@ -1472,7 +1472,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
 
@@ -1518,7 +1518,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1534,7 +1534,7 @@ public class TransactionManagerTest {
         transactionManager.failIfNotReadyForSend();
         transactionManager.maybeAddPartitionToTransaction(tp1);
         Future<RecordMetadata> secondResponseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp1, epoch, pid);
         prepareProduceResponse(Errors.NONE, pid, epoch);
@@ -1570,7 +1570,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1609,7 +1609,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         TransactionalRequestResult commitResult = transactionManager.beginCommit();
         assertFalse(responseFuture.isDone());
@@ -1659,7 +1659,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1688,7 +1688,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1726,7 +1726,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxn(tp0, Errors.NONE);
         sender.runOnce();
@@ -1776,7 +1776,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxn(tp0, Errors.NONE);
         sender.runOnce();
@@ -1832,7 +1832,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
         sender.runOnce();  // Send AddPartitionsRequest
@@ -1861,7 +1861,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
 
@@ -1894,7 +1894,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, producerEpoch, producerId);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         sender.runOnce();  // Send AddPartitions and let it fail
         assertFalse(responseFuture.isDone());
@@ -1934,7 +1934,7 @@ public class TransactionManagerTest {
         prepareProduceResponse(Errors.REQUEST_TIMED_OUT, producerId, producerEpoch);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         sender.runOnce();  // Send AddPartitions
         sender.runOnce();  // Send ProduceRequest and let it fail
@@ -1970,7 +1970,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid);
@@ -2131,11 +2131,11 @@ public class TransactionManagerTest {
         transactionManager.failIfNotReadyForSend();
         transactionManager.maybeAddPartitionToTransaction(tp0);
         accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false);
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
         transactionManager.failIfNotReadyForSend();
         transactionManager.maybeAddPartitionToTransaction(tp1);
         accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false);
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
 
         assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
         assertFalse(transactionManager.isSendToPartitionAllowed(tp1));
@@ -2188,7 +2188,7 @@ public class TransactionManagerTest {
         Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1),
                 Collections.emptySet(), Collections.emptySet());
         accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false);
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
         Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, Collections.singleton(node1),
                 Integer.MAX_VALUE,
                 time.milliseconds());
@@ -2208,7 +2208,7 @@ public class TransactionManagerTest {
         transactionManager.beginTransaction();
         // Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain.
         accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false);
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
         Node node1 = new Node(0, "localhost", 1111);
         PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
 
@@ -2235,7 +2235,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
@@ -2264,7 +2264,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
 
@@ -2313,9 +2313,9 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp1);
 
         Future<RecordMetadata> firstBatchResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
         Future<RecordMetadata> secondBatchResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
-               "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+               "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(firstBatchResponse.isDone());
         assertFalse(secondBatchResponse.isDone());
@@ -2378,7 +2378,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
 
@@ -2445,7 +2445,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
 
         assertFalse(responseFuture.isDone());
 
@@ -2644,7 +2644,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false);
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
 
@@ -2686,7 +2686,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT,  false).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxn(tp0, error);
         sender.runOnce();  // attempt send addPartitions.
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e55e3b8..5fe06a4 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -32,6 +32,10 @@
     </li>
     <li>The internal <code>PartitionAssignor</code> interface has been deprecated and replaced with a new <code>ConsumerPartitionAssignor</code> in the public API. Users
         implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.</li>
+    <li>The <code>DefaultPartitioner</code> now uses a sticky partitioning strategy. This means that records for specific topic with null keys and no assigned partition 
+        will be sent to the same partition until the batch is ready to be sent. When a new batch is created, a new partition is chosen. This decreases latency to produce, but
+        it may result in uneven distribution of records across partitions in edge cases. Generally users will not be impacted, but this difference may be noticeable in tests and
+        other situations producing records for a very short amount of time.</li>
 </ul>
 
 <h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>