You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/08/12 21:17:53 UTC
[kafka] branch trunk updated: MINOR: Clean up the sticky
partitioner code a bit (#7151)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88087e9 MINOR: Clean up the sticky partitioner code a bit (#7151)
88087e9 is described below
commit 88087e91dd4eed1ee3c3e12961db84b7b56c34a0
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Mon Aug 12 14:17:28 2019 -0700
MINOR: Clean up the sticky partitioner code a bit (#7151)
Reviewers: Colin P. McCabe <cm...@apache.org>, Lucas Bradstreet <lu...@gmail.com>
---
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../producer/internals/StickyPartitionCache.java | 4 +-
.../producer/internals/TransactionManagerTest.java | 72 +++++++++++-----------
docs/upgrade.html | 4 ++
4 files changed, 43 insertions(+), 39 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c586fb4..3b3589f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -925,7 +925,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
- log.trace("Retrying because of a new batch, sending the record to topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
+ log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/StickyPartitionCache.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/StickyPartitionCache.java
index 117eb5f..bfe1ac6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/StickyPartitionCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/StickyPartitionCache.java
@@ -51,14 +51,14 @@ public class StickyPartitionCache {
// triggered the new batch matches the sticky partition that needs to be changed.
if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
- Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
if (availablePartitions.size() < 1) {
+ Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
- random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
+ Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index cca5771..24869aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -154,7 +154,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(tp0, Errors.NONE);
prepareProduceResponse(Errors.NONE, pid, epoch);
@@ -800,7 +800,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1251,7 +1251,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> responseFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
@@ -1275,7 +1275,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
@@ -1309,14 +1309,14 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxn(tp0, Errors.NONE);
Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
- "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
- "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
assertTrue(transactionManager.hasAbortableError());
@@ -1342,7 +1342,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
@@ -1376,7 +1376,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxn(tp0, Errors.NONE);
Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(tp0, time.milliseconds(),
- "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
@@ -1389,7 +1389,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
- "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
assertTrue(transactionManager.hasAbortableError());
@@ -1419,7 +1419,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
@@ -1472,7 +1472,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
@@ -1518,7 +1518,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1534,7 +1534,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp1);
Future<RecordMetadata> secondResponseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxnResponse(Errors.NONE, tp1, epoch, pid);
prepareProduceResponse(Errors.NONE, pid, epoch);
@@ -1570,7 +1570,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1609,7 +1609,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
TransactionalRequestResult commitResult = transactionManager.beginCommit();
assertFalse(responseFuture.isDone());
@@ -1659,7 +1659,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1688,7 +1688,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -1726,7 +1726,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(tp0, Errors.NONE);
sender.runOnce();
@@ -1776,7 +1776,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxn(tp0, Errors.NONE);
sender.runOnce();
@@ -1832,7 +1832,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
sender.runOnce(); // Send AddPartitionsRequest
@@ -1861,7 +1861,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
@@ -1894,7 +1894,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, producerEpoch, producerId);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
sender.runOnce(); // Send AddPartitions and let it fail
assertFalse(responseFuture.isDone());
@@ -1934,7 +1934,7 @@ public class TransactionManagerTest {
prepareProduceResponse(Errors.REQUEST_TIMED_OUT, producerId, producerEpoch);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
sender.runOnce(); // Send AddPartitions
sender.runOnce(); // Send ProduceRequest and let it fail
@@ -1970,7 +1970,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid);
@@ -2131,11 +2131,11 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp1);
accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp1));
@@ -2188,7 +2188,7 @@ public class TransactionManagerTest {
Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1),
Collections.emptySet(), Collections.emptySet());
accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, Collections.singleton(node1),
Integer.MAX_VALUE,
time.milliseconds());
@@ -2208,7 +2208,7 @@ public class TransactionManagerTest {
transactionManager.beginTransaction();
// Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain.
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
Node node1 = new Node(0, "localhost", 1111);
PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
@@ -2235,7 +2235,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
@@ -2264,7 +2264,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
@@ -2313,9 +2313,9 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp1);
Future<RecordMetadata> firstBatchResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> secondBatchResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(firstBatchResponse.isDone());
assertFalse(secondBatchResponse.isDone());
@@ -2378,7 +2378,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
@@ -2445,7 +2445,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
@@ -2644,7 +2644,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -2686,7 +2686,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxn(tp0, error);
sender.runOnce(); // attempt send addPartitions.
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e55e3b8..5fe06a4 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -32,6 +32,10 @@
</li>
<li>The internal <code>PartitionAssignor</code> interface has been deprecated and replaced with a new <code>ConsumerPartitionAssignor</code> in the public API. Users
implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.</li>
+ <li>The <code>DefaultPartitioner</code> now uses a sticky partitioning strategy. This means that records for specific topic with null keys and no assigned partition
+ will be sent to the same partition until the batch is ready to be sent. When a new batch is created, a new partition is chosen. This decreases latency to produce, but
+ it may result in uneven distribution of records across partitions in edge cases. Generally users will not be impacted, but this difference may be noticeable in tests and
+ other situations producing records for a very short amount of time.</li>
</ul>
<h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>