You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/03/07 22:45:30 UTC

[kafka] branch 2.1 updated: KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread shutdown (#6388)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b4952ba  KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread shutdown (#6388)
b4952ba is described below

commit b4952ba7860235dedf7255d750e1b7418b23e46a
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Mar 8 03:58:21 2019 +0530

    KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread shutdown (#6388)
    
    In KAFKA-5503, we have added a check  for `running` flag in the loop inside maybeWaitForProducerId.  This is to handle concurrent call to Sender close(), while we attempt to get the ProducerId. This avoids blocking indefinitely when the producer is shutting down.
    
    This created a corner case, where Sender thread gets blocked, if we had concurrent producerId reset and call to Sender thread close. The fix here is to check the `forceClose` flag in the loop inside maybeWaitForProducerId instead of the `running` flag.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/producer/internals/Sender.java   |  2 +-
 .../clients/producer/internals/SenderTest.java     | 75 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1879d6f..c1fc40b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -494,7 +494,7 @@ public class Sender implements Runnable {
     }
 
     private void maybeWaitForProducerId() {
-        while (running && !transactionManager.hasProducerId() && !transactionManager.hasError()) {
+        while (!forceClose && !transactionManager.hasProducerId() && !transactionManager.hasError()) {
             Node node = null;
             try {
                 node = awaitLeastLoadedNodeReady(requestTimeoutMs);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5bef134..ffc0763 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -81,6 +81,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -1201,6 +1202,80 @@ public class SenderTest {
     }
 
     @Test
+    public void testCloseWithProducerIdReset() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
+        setupWithTransactionState(transactionManager);
+
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
+            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect and send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.initiateClose(); // initiate close
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                prepareInitPidResponse(Errors.NONE, producerId + 1, (short) 1);
+                sender.run(time.milliseconds());
+                return !accumulator.hasUndrained();
+            }
+        }, 5000, "Failed to drain batches");
+    }
+
+    @Test
+    public void testForceCloseWithProducerIdReset() throws Exception {
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(1L, (short) 0));
+        setupWithTransactionState(transactionManager);
+
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
+            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect and send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
+        sender.forceClose(); // initiate force close
+        sender.run(time.milliseconds()); // this should not block
+        sender.run(); // run main loop to test forceClose flag
+        assertTrue("Pending batches are not aborted.", !accumulator.hasUndrained());
+        assertTrue(successfulResponse.isDone());
+    }
+
+    @Test
     public void testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceOnSubsequentRetry() throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();