You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/08/22 06:36:27 UTC
[kafka] branch 2.3 updated: KAFKA-8325;
Remove batch from in-flight requests on MESSAGE_TOO_LARGE errors
(#7176)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new db8cb96 KAFKA-8325; Remove batch from in-flight requests on MESSAGE_TOO_LARGE errors (#7176)
db8cb96 is described below
commit db8cb96ab151c52d41078032c94e7ff2091460a0
Author: Bob Barrett <bo...@confluent.io>
AuthorDate: Wed Aug 21 23:29:20 2019 -0700
KAFKA-8325; Remove batch from in-flight requests on MESSAGE_TOO_LARGE errors (#7176)
This patch fixes a bug in the handling of MESSAGE_TOO_LARGE errors. The large batch is split, the smaller batches are re-added to the accumulator, and the batch is deallocated, but it was not removed from the list of in-flight batches. When the batch was eventually expired from the in-flight batches, the producer would try to deallocate it a second time, causing an error. This patch changes the behavior to correctly remove the batch from the list of in-flight requests.
Reviewers: Luke Stephenson, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/producer/internals/Sender.java | 15 +++++---
.../clients/producer/internals/SenderTest.java | 44 +++++++++++++++++++++-
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index efa418c..ebfea0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -159,7 +159,7 @@ public class Sender implements Runnable {
return inFlightBatches.containsKey(tp) ? inFlightBatches.get(tp) : new ArrayList<>();
}
- public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
+ private void maybeRemoveFromInflightBatches(ProducerBatch batch) {
List<ProducerBatch> batches = inFlightBatches.get(batch.topicPartition);
if (batches != null) {
batches.remove(batch);
@@ -169,6 +169,11 @@ public class Sender implements Runnable {
}
}
+ private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
+ maybeRemoveFromInflightBatches(batch);
+ this.accumulator.deallocate(batch);
+ }
+
/**
* Get the in-flight batches that has reached delivery timeout.
*/
@@ -625,7 +630,7 @@ public class Sender implements Runnable {
if (transactionManager != null)
transactionManager.removeInFlightBatch(batch);
this.accumulator.splitAndReenqueue(batch);
- this.accumulator.deallocate(batch);
+ maybeRemoveAndDeallocateBatch(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {
if (canRetry(batch, response, now)) {
@@ -700,8 +705,7 @@ public class Sender implements Runnable {
}
if (batch.done(response.baseOffset, response.logAppendTime, null)) {
- maybeRemoveFromInflightBatches(batch);
- this.accumulator.deallocate(batch);
+ maybeRemoveAndDeallocateBatch(batch);
}
}
@@ -724,8 +728,7 @@ public class Sender implements Runnable {
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
if (batch.done(baseOffset, logAppendTime, exception)) {
- maybeRemoveFromInflightBatches(batch);
- this.accumulator.deallocate(batch);
+ maybeRemoveAndDeallocateBatch(batch);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 194176d..5317b4c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -2109,7 +2109,7 @@ public class SenderTest {
@Test
public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
long deliverTimeoutMs = 1500L;
- // create a producer batch with more than one record so it is eligible to split
+ // create a producer batch with more than one record so it is eligible for splitting
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
@@ -2117,7 +2117,8 @@ public class SenderTest {
accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
- sender.runOnce(); // send request
+ // send request
+ sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
// return a MESSAGE_TOO_LARGE error
client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
@@ -2309,6 +2310,45 @@ public class SenderTest {
verify(client, times(2)).poll(eq(RETRY_BACKOFF_MS), anyLong());
}
+ @Test
+ public void testTooLargeBatchesAreSafelyRemoved() throws InterruptedException {
+ ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+ TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100);
+
+ setupWithTransactionState(txnManager, false, null);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+
+ txnManager.beginTransaction();
+ txnManager.maybeAddPartitionToTransaction(tp0);
+ client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, Errors.NONE)));
+ sender.runOnce();
+
+ // create a producer batch with more than one record so it is eligible for splitting
+ Future<RecordMetadata> request1 =
+ accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
+ MAX_BLOCK_TIMEOUT).future;
+ Future<RecordMetadata> request2 =
+ accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
+ MAX_BLOCK_TIMEOUT).future;
+
+ // send request
+ sender.runOnce();
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+ // return a MESSAGE_TOO_LARGE error
+ client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
+ sender.runOnce();
+
+ // process retried response
+ sender.runOnce();
+ client.respond(produceResponse(tp0, 0, Errors.NONE, 0));
+ sender.runOnce();
+
+ // In-flight batches should be empty. Sleep past the expiration time of the batch and run once, no error should be thrown
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+ time.sleep(2000);
+ sender.runOnce();
+ }
+
class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {
private TransactionResult requiredResult;