You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/10/06 22:04:31 UTC

kafka git commit: KAFKA-6015; Fix NPE in RecordAccumulator after ProducerId reset

Repository: kafka
Updated Branches:
  refs/heads/trunk 10cd98cc8 -> 105ab47ed


KAFKA-6015; Fix NPE in RecordAccumulator after ProducerId reset

It is possible for batches with sequence numbers to be in the `deque` while at the same time the in flight batches in the `TransactionManager` are removed due to a producerId reset.

In this case, when the batches in the `deque` are drained, we will get a `NullPointerException` in the background thread due to this line:

```java
if (first.hasSequence() && first.baseSequence() != transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
```

Particularly, `transactionManager.nextBatchBySequence` will return null, because there no inflight batches being tracked.

In this patch, we simply allow the batches in the `deque` to be drained if there are no in flight batches being tracked in the TransactionManager. If they succeed, well and good. If the responses come back with an error, the batces will be ultimately failed in the producer with an `OutOfOrderSequenceException` when the response comes back.

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #4022 from apurvam/KAFKA-6015-npe-in-record-accumulator


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/105ab47e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/105ab47e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/105ab47e

Branch: refs/heads/trunk
Commit: 105ab47ed90c8a0e83c159c97a8f2294c5582657
Parents: 10cd98c
Author: Apurva Mehta <ap...@confluent.io>
Authored: Fri Oct 6 15:01:22 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Oct 6 15:01:29 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   |   5 +-
 .../clients/producer/internals/Sender.java      |   4 +-
 .../producer/internals/TransactionManager.java  |  19 +++
 .../clients/producer/internals/SenderTest.java  | 116 +++++++++++++++++++
 .../scala/kafka/log/ProducerStateManager.scala  |  20 +++-
 5 files changed, 157 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/105ab47e/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 46cf6c4..ba8c28e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -538,8 +538,9 @@ public final class RecordAccumulator {
                                                 // on the client after being sent to the broker at least once.
                                                 break;
 
-                                            if (first.hasSequence()
-                                                    && first.baseSequence() != transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
+                                            int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
+                                            if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
+                                                    && first.baseSequence() != firstInFlightSequence)
                                                 // If the queued batch already has an assigned sequence, then it is being
                                                 // retried. In this case, we wait until the next immediate batch is ready
                                                 // and drain that. We only move on when the next in line batch is complete (either successfully

http://git-wip-us.apache.org/repos/asf/kafka/blob/105ab47e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8b5780b..7eea499 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -528,8 +528,8 @@ public class Sender implements Runnable {
                 } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
                     // If idempotence is enabled only retry the request if the current producer id is the same as
                     // the producer id of the batch.
-                    log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
-                            batch.baseSequence());
+                    log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
+                            batch.topicPartition, batch.producerId(), batch.baseSequence());
                     reenqueueBatch(batch, now);
                 } else {
                     failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/105ab47e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index a0b45cd..006a12b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
@@ -435,6 +436,24 @@ public class TransactionManager {
         inflightBatchesBySequence.get(batch.topicPartition).offer(batch);
     }
 
+    /**
+     * Returns the first inflight sequence for a given partition. This is the base sequence of an inflight batch with
+     * the lowest sequence number.
+     * @return the lowest inflight sequence if the transaction manager is tracking inflight requests for this partition.
+     *         If there are no inflight requests being tracked for this partition, this method will return
+     *         RecordBatch.NO_SEQUENCE.
+     */
+    synchronized int firstInFlightSequence(TopicPartition topicPartition) {
+        PriorityQueue<ProducerBatch> inFlightBatches = inflightBatchesBySequence.get(topicPartition);
+        if (inFlightBatches == null)
+            return RecordBatch.NO_SEQUENCE;
+
+        ProducerBatch firstInFlightBatch = inFlightBatches.peek();
+        if (firstInFlightBatch == null)
+            return RecordBatch.NO_SEQUENCE;
+
+        return firstInFlightBatch.baseSequence();
+    }
 
     synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
         PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(topicPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/105ab47e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index a32688b..1ce8e5a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -1105,6 +1105,103 @@ public class SenderTest {
     }
 
     @Test
+    public void testResetOfProducerStateShouldAllowQueuedBatchesToDrain() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
+        setupWithTransactionState(transactionManager);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
+        prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
+        assertEquals(producerId + 1, transactionManager.producerIdAndEpoch().producerId);
+        sender.run(time.milliseconds());  // send request to tp1
+
+        assertFalse(successfulResponse.isDone());
+        client.respond(produceResponse(tp1, 10, Errors.NONE, -1));
+        sender.run(time.milliseconds());
+
+        assertTrue(successfulResponse.isDone());
+        assertEquals(10, successfulResponse.get().offset());
+
+        // Since the response came back for the old producer id, we shouldn't update the next sequence.
+        assertEquals(0, transactionManager.sequenceNumber(tp1).longValue());
+    }
+
+    @Test
+    public void testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceOnSubsequentRetry() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
+        setupWithTransactionState(transactionManager);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
+        prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
+        assertEquals(producerId + 1, transactionManager.producerIdAndEpoch().producerId);
+        sender.run(time.milliseconds());  // send request to tp1 with the old producerId
+
+        assertFalse(successfulResponse.isDone());
+        // The response comes back with a retriable error.
+        client.respond(produceResponse(tp1, 0, Errors.NOT_LEADER_FOR_PARTITION, -1));
+        sender.run(time.milliseconds());
+
+        assertTrue(successfulResponse.isDone());
+        // Since the batch has an old producerId, it will not be retried yet again, but will be failed with a Fatal
+        // exception.
+        try {
+            successfulResponse.get();
+            fail("Should have raised an OutOfOrderSequenceException");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
+        }
+    }
+
+    @Test
     public void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
@@ -1799,12 +1896,31 @@ public class SenderTest {
         };
     }
 
+    class OffsetAndError {
+        long offset;
+        Errors error;
+        OffsetAndError(long offset, Errors error) {
+            this.offset = offset;
+            this.error = error;
+        }
+    }
+
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
         ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }
 
+    private ProduceResponse produceResponse(Map<TopicPartition, OffsetAndError> responses) {
+        Map<TopicPartition, ProduceResponse.PartitionResponse> partResponses = new LinkedHashMap<>();
+        for (Map.Entry<TopicPartition, OffsetAndError> entry : responses.entrySet()) {
+            ProduceResponse.PartitionResponse response = new ProduceResponse.PartitionResponse(entry.getValue().error,
+                    entry.getValue().offset, RecordBatch.NO_TIMESTAMP, -1);
+            partResponses.put(entry.getKey(), response);
+        }
+        return new ProduceResponse(partResponses);
+
+    }
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
         return produceResponse(tp, offset, error, throttleTimeMs, -1L);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/105ab47e/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 81726c1..7c0a3da 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -40,8 +40,22 @@ class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 // ValidationType and its subtypes define the extent of the validation to perform on a given ProducerAppendInfo instance
 private[log] sealed trait ValidationType
 private[log] object ValidationType {
+
+  /**
+    * This indicates no validation should be performed on the incoming append. This is the case for all appends on
+    * a replica, as well as appends when the producer state is being built from the log.
+    */
   case object None extends ValidationType
+
+  /**
+    * We only validate the epoch (and not the sequence numbers) for offset commit requests coming from the transactional
+    * producer. These appends will not have sequence numbers, so we can't validate them.
+    */
   case object EpochOnly extends ValidationType
+
+  /**
+    * Perform the full validation. This should be used fo regular produce requests coming to the leader.
+    */
   case object Full extends ValidationType
 }
 
@@ -148,9 +162,9 @@ private[log] class ProducerIdEntry(val producerId: Long, val batchMetadata: muta
  *                      be made against the lastest append in the current entry. New appends will replace older appends
  *                      in the current entry so that the space overhead is constant.
  * @param validationType Indicates the extent of validation to perform on the appends on this instance. Offset commits
- *                       coming from the producer should have EpochOnlyValidation. Appends which aren't from a client
- *                       will not be validated at all, and should be set to NoValidation. All other appends should
- *                       have FullValidation.
+ *                       coming from the producer should have ValidationType.EpochOnly. Appends which aren't from a client
+ *                       should have ValidationType.None. Appends coming from a client for produce requests should have
+ *                       ValidationType.Full.
  */
 private[log] class ProducerAppendInfo(val producerId: Long,
                                       currentEntry: ProducerIdEntry,