You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/08/22 06:36:27 UTC

[kafka] branch 2.3 updated: KAFKA-8325; Remove batch from in-flight requests on MESSAGE_TOO_LARGE errors (#7176)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new db8cb96  KAFKA-8325; Remove batch from in-flight requests on MESSAGE_TOO_LARGE errors (#7176)
db8cb96 is described below

commit db8cb96ab151c52d41078032c94e7ff2091460a0
Author: Bob Barrett <bo...@confluent.io>
AuthorDate: Wed Aug 21 23:29:20 2019 -0700

    KAFKA-8325; Remove batch from in-flight requests on MESSAGE_TOO_LARGE errors (#7176)
    
    This patch fixes a bug in the handling of MESSAGE_TOO_LARGE errors. The large batch is split, the smaller batches are re-added to the accumulator, and the batch is deallocated, but it was not removed from the list of in-flight batches. When the batch was eventually expired from the in-flight batches, the producer would try to deallocate it a second time, causing an error. This patch changes the behavior to correctly remove the batch from the list of in-flight requests.
    
    Reviewers: Luke Stephenson, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/producer/internals/Sender.java   | 15 +++++---
 .../clients/producer/internals/SenderTest.java     | 44 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index efa418c..ebfea0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -159,7 +159,7 @@ public class Sender implements Runnable {
         return inFlightBatches.containsKey(tp) ? inFlightBatches.get(tp) : new ArrayList<>();
     }
 
-    public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
+    private void maybeRemoveFromInflightBatches(ProducerBatch batch) {
         List<ProducerBatch> batches = inFlightBatches.get(batch.topicPartition);
         if (batches != null) {
             batches.remove(batch);
@@ -169,6 +169,11 @@ public class Sender implements Runnable {
         }
     }
 
+    private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
+        maybeRemoveFromInflightBatches(batch);
+        this.accumulator.deallocate(batch);
+    }
+
     /**
      *  Get the in-flight batches that has reached delivery timeout.
      */
@@ -625,7 +630,7 @@ public class Sender implements Runnable {
             if (transactionManager != null)
                 transactionManager.removeInFlightBatch(batch);
             this.accumulator.splitAndReenqueue(batch);
-            this.accumulator.deallocate(batch);
+            maybeRemoveAndDeallocateBatch(batch);
             this.sensors.recordBatchSplit();
         } else if (error != Errors.NONE) {
             if (canRetry(batch, response, now)) {
@@ -700,8 +705,7 @@ public class Sender implements Runnable {
         }
 
         if (batch.done(response.baseOffset, response.logAppendTime, null)) {
-            maybeRemoveFromInflightBatches(batch);
-            this.accumulator.deallocate(batch);
+            maybeRemoveAndDeallocateBatch(batch);
         }
     }
 
@@ -724,8 +728,7 @@ public class Sender implements Runnable {
         this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
 
         if (batch.done(baseOffset, logAppendTime, exception)) {
-            maybeRemoveFromInflightBatches(batch);
-            this.accumulator.deallocate(batch);
+            maybeRemoveAndDeallocateBatch(batch);
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 194176d..5317b4c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -2109,7 +2109,7 @@ public class SenderTest {
     @Test
     public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
         long deliverTimeoutMs = 1500L;
-        // create a producer batch with more than one record so it is eligible to split
+        // create a producer batch with more than one record so it is eligible for splitting
         Future<RecordMetadata> request1 =
             accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
                 MAX_BLOCK_TIMEOUT).future;
@@ -2117,7 +2117,8 @@ public class SenderTest {
             accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
                 MAX_BLOCK_TIMEOUT).future;
 
-        sender.runOnce();  // send request
+        // send request
+        sender.runOnce();
         assertEquals(1, client.inFlightRequestCount());
         // return a MESSAGE_TOO_LARGE error
         client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
@@ -2309,6 +2310,45 @@ public class SenderTest {
         verify(client, times(2)).poll(eq(RETRY_BACKOFF_MS), anyLong());
     }
 
+    @Test
+    public void testTooLargeBatchesAreSafelyRemoved() throws InterruptedException {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100);
+
+        setupWithTransactionState(txnManager, false, null);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartitionToTransaction(tp0);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, Errors.NONE)));
+        sender.runOnce();
+
+        // create a producer batch with more than one record so it is eligible for splitting
+        Future<RecordMetadata> request1 =
+                accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
+                        MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> request2 =
+                accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
+                        MAX_BLOCK_TIMEOUT).future;
+
+        // send request
+        sender.runOnce();
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+        // return a MESSAGE_TOO_LARGE error
+        client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
+        sender.runOnce();
+
+        // process retried response
+        sender.runOnce();
+        client.respond(produceResponse(tp0, 0, Errors.NONE, 0));
+        sender.runOnce();
+
+        // In-flight batches should be empty. Sleep past the expiration time of the batch and run once, no error should be thrown
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+        time.sleep(2000);
+        sender.runOnce();
+    }
+
     class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {
 
         private TransactionResult requiredResult;