You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/09/14 23:11:40 UTC

[1/3] kafka git commit: KAFKA-5494; Enable idempotence with max.in.flight.requests.per.connection > 1

Repository: kafka
Updated Branches:
  refs/heads/trunk 8a5e86660 -> 5d2422258


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index bb41380..976bbd7 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -63,7 +63,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     append(stateManager, producerId, epoch, 1, 0L, 1L)
 
     // Duplicate sequence number (matches previous sequence number)
-    assertThrows[DuplicateSequenceNumberException] {
+    assertThrows[DuplicateSequenceException] {
       append(stateManager, producerId, epoch, 1, 0L, 1L)
     }
 
@@ -95,7 +95,8 @@ class ProducerStateManagerTest extends JUnitSuite {
 
     val lastEntry = maybeLastEntry.get
     assertEquals(epoch, lastEntry.producerEpoch)
-    assertEquals(0, lastEntry.firstSeq)
+
+    assertEquals(Int.MaxValue, lastEntry.firstSeq)
     assertEquals(0, lastEntry.lastSeq)
   }
 
@@ -122,7 +123,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(epoch, lastEntry.producerEpoch)
     assertEquals(sequence, lastEntry.firstSeq)
     assertEquals(sequence, lastEntry.lastSeq)
-    assertEquals(offset, lastEntry.lastOffset)
+    assertEquals(offset, lastEntry.lastDataOffset)
     assertEquals(offset, lastEntry.firstOffset)
   }
 
@@ -158,7 +159,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers = true,
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), validateSequenceNumbers = true,
       loadingFromLog = false)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true)
 
@@ -175,7 +176,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers = true,
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), validateSequenceNumbers = true,
       loadingFromLog = false)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true)
 
@@ -198,22 +199,22 @@ class ProducerStateManagerTest extends JUnitSuite {
 
     val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
     appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true)
-    var lastEntry = appendInfo.lastEntry
+    var lastEntry = appendInfo.latestEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
-    assertEquals(1, lastEntry.firstSeq)
+    assertEquals(0, lastEntry.firstSeq)
     assertEquals(5, lastEntry.lastSeq)
-    assertEquals(16L, lastEntry.firstOffset)
-    assertEquals(20L, lastEntry.lastOffset)
+    assertEquals(9L, lastEntry.firstOffset)
+    assertEquals(20L, lastEntry.lastDataOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
     appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, isTransactional = true)
-    lastEntry = appendInfo.lastEntry
+    lastEntry = appendInfo.latestEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
-    assertEquals(6, lastEntry.firstSeq)
+    assertEquals(0, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
-    assertEquals(26L, lastEntry.firstOffset)
-    assertEquals(30L, lastEntry.lastOffset)
+    assertEquals(9L, lastEntry.firstOffset)
+    assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
@@ -224,12 +225,13 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(40L, completedTxn.lastOffset)
     assertFalse(completedTxn.isAborted)
 
-    lastEntry = appendInfo.lastEntry
+    lastEntry = appendInfo.latestEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
-    assertEquals(10, lastEntry.firstSeq)
+    // verify that appending the transaction marker doesn't affect the metadata of the cached record batches.
+    assertEquals(0, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
-    assertEquals(40L, lastEntry.firstOffset)
-    assertEquals(40L, lastEntry.lastOffset)
+    assertEquals(9L, lastEntry.firstOffset)
+    assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch)
     assertEquals(None, lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
@@ -421,7 +423,7 @@ class ProducerStateManagerTest extends JUnitSuite {
 
     val maybeEntry = stateManager.lastEntry(anotherPid)
     assertTrue(maybeEntry.isDefined)
-    assertEquals(3L, maybeEntry.get.lastOffset)
+    assertEquals(3L, maybeEntry.get.lastDataOffset)
 
     stateManager.truncateHead(3)
     assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
@@ -452,7 +454,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val entry = stateManager.lastEntry(pid2)
     assertTrue(entry.isDefined)
     assertEquals(0, entry.get.lastSeq)
-    assertEquals(1L, entry.get.lastOffset)
+    assertEquals(1L, entry.get.lastDataOffset)
   }
 
   @Test
@@ -663,7 +665,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertFalse(snapshotToTruncate.exists())
 
     val loadedProducerState = reloadedStateManager.activeProducers(producerId)
-    assertEquals(0L, loadedProducerState.lastOffset)
+    assertEquals(0L, loadedProducerState.lastDataOffset)
   }
 
   private def appendEndTxnMarker(mapping: ProducerStateManager,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a52c83c..902d1c3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1379,11 +1379,12 @@ object TestUtils extends Logging {
     records
   }
 
-  def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer]) = {
+  def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer], batchSize: Int = 16384) = {
     val props = new Properties()
     props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
-    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 3903a3a..0d74645 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -149,6 +149,11 @@ public class TransactionalMessageCopier {
                 "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringSerializer");
+        // We set a small batch size to ensure that we have multiple inflight requests per transaction.
+        // If it is left at the default, each transaction will have only one batch per partition, hence not testing
+        // the case with multiple inflights.
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512");
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
 
         return new KafkaProducer<>(props);
     }
@@ -252,7 +257,6 @@ public class TransactionalMessageCopier {
         maxMessages = Math.min(messagesRemaining(consumer, inputPartition), maxMessages);
         final boolean enableRandomAborts = parsedArgs.getBoolean("enableRandomAborts");
 
-
         producer.initTransactions();
 
         final AtomicBoolean isShuttingDown = new AtomicBoolean(false);


[3/3] kafka git commit: KAFKA-5494; Enable idempotence with max.in.flight.requests.per.connection > 1

Posted by jg...@apache.org.
KAFKA-5494; Enable idempotence with max.in.flight.requests.per.connection > 1

Here we introduce client and broker changes to support multiple inflight requests while still guaranteeing idempotence. Two major problems to be solved:

1. Sequence number management on the client when there are request failures. When a batch fails,  future inflight batches will also fail with `OutOfOrderSequenceException`. This must be handled on the client with intelligent sequence reassignment. We must also deal with the fatal failure of some batch: the future batches must get different sequence numbers when the come back.
2. On the broker, when we have multiple inflights, we can get duplicates of multiple old batches. With this patch, we retain the record metadata for 5 older batches.

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3743 from apurvam/KAFKA-5494-increase-max-in-flight-for-idempotent-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d242225
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d242225
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d242225

Branch: refs/heads/trunk
Commit: 5d2422258cb975a137a42a4e08f03573c49a387e
Parents: 8a5e866
Author: Apurva Mehta <ap...@confluent.io>
Authored: Thu Sep 14 16:10:14 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Sep 14 16:10:19 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  18 +-
 .../producer/internals/ProducerBatch.java       |  36 +-
 .../producer/internals/RecordAccumulator.java   | 108 ++-
 .../clients/producer/internals/Sender.java      |  78 ++-
 .../producer/internals/TransactionManager.java  | 182 ++++-
 .../errors/DuplicateSequenceException.java      |  24 +
 .../DuplicateSequenceNumberException.java       |  24 -
 .../apache/kafka/common/protocol/Errors.java    |   4 +-
 .../common/record/MemoryRecordsBuilder.java     |  14 +
 .../org/apache/kafka/clients/MockClient.java    |  10 +
 .../clients/producer/internals/SenderTest.java  | 682 ++++++++++++++++++-
 .../internals/TransactionManagerTest.java       |  88 ++-
 core/src/main/scala/kafka/log/Log.scala         |  19 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |   6 +-
 .../scala/kafka/log/ProducerStateManager.scala  | 197 +++---
 .../scala/kafka/tools/DumpLogSegments.scala     |  12 +-
 .../kafka/api/ProducerBounceTest.scala          |   2 +-
 .../kafka/api/TransactionsBounceTest.scala      |  22 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   3 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  97 ++-
 .../kafka/log/ProducerStateManagerTest.scala    |  42 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   5 +-
 .../kafka/tools/TransactionalMessageCopier.java |   6 +-
 23 files changed, 1384 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 18248bb..b630d61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -365,7 +365,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             this.transactionManager = configureTransactionState(config, logContext, log);
             int retries = configureRetries(config, transactionManager != null, log);
-            int maxInflightRequests = configureInflightRequests(config, transactionManager != null, log);
+            int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
             short acks = configureAcks(config, transactionManager != null, log);
 
             this.apiVersions = new ApiVersions();
@@ -481,18 +481,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         return config.getInt(ProducerConfig.RETRIES_CONFIG);
     }
 
-    private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
-        boolean userConfiguredInflights = false;
-        if (config.originals().containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
-            userConfiguredInflights = true;
-        }
-        if (idempotenceEnabled && !userConfiguredInflights) {
-            log.info("Overriding the default {} to 1 since idempontence is enabled.", ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-            return 1;
-        }
-        if (idempotenceEnabled && config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) != 1) {
-            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 in order" +
-                    "to use the idempotent producer. Otherwise we cannot guarantee idempotence.");
+    private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
+        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
+                    " to use the idempotent producer.");
         }
         return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index ee7d21a..93c843b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -75,6 +75,7 @@ public final class ProducerBatch {
     private long drainedMs;
     private String expiryErrorMessage;
     private boolean retry;
+    private boolean reopened = false;
 
     public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
         this(tp, recordsBuilder, now, false);
@@ -249,6 +250,15 @@ public final class ProducerBatch {
 
         produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, new RecordBatchTooLargeException());
         produceFuture.done();
+
+        if (hasSequence()) {
+            int sequence = baseSequence();
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId(), producerEpoch());
+            for (ProducerBatch newBatch : batches) {
+                newBatch.setProducerState(producerIdAndEpoch, sequence, isTransactional());
+                sequence += newBatch.recordCount;
+            }
+        }
         return batches;
     }
 
@@ -375,8 +385,12 @@ public final class ProducerBatch {
     }
 
     public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
-        recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
-                baseSequence, isTransactional);
+        recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
+    }
+
+    public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
+        reopened = true;
+        recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
     }
 
     /**
@@ -394,6 +408,7 @@ public final class ProducerBatch {
                                                        recordsBuilder.compressionType(),
                                                        (float) recordsBuilder.compressionRatio());
         }
+        reopened = false;
     }
 
     /**
@@ -434,4 +449,21 @@ public final class ProducerBatch {
     public short producerEpoch() {
         return recordsBuilder.producerEpoch();
     }
+
+    public int baseSequence() {
+        return recordsBuilder.baseSequence();
+    }
+
+    public boolean hasSequence() {
+        return baseSequence() != RecordBatch.NO_SEQUENCE;
+    }
+
+    public boolean isTransactional() {
+        return recordsBuilder.isTransactional();
+    }
+
+    public boolean sequenceHasBeenReset() {
+        return reopened;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 72d3b29..eb162be 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -252,7 +252,8 @@ public final class RecordAccumulator {
      *  and memory records built) in one of the following cases (whichever comes first): right before send,
      *  if it is expired, or when the producer is closed.
      */
-    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
+    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
+                                         Callback callback, Deque<ProducerBatch> deque) {
         ProducerBatch last = deque.peekLast();
         if (last != null) {
             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
@@ -260,7 +261,6 @@ public final class RecordAccumulator {
                 last.closeForRecordAppends();
             else
                 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
-
         }
         return null;
     }
@@ -310,7 +310,10 @@ public final class RecordAccumulator {
         batch.reenqueued(now);
         Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
         synchronized (deque) {
-            deque.addFirst(batch);
+            if (transactionManager != null)
+                insertInSequenceOrder(deque, batch);
+            else
+                deque.addFirst(batch);
         }
     }
 
@@ -332,12 +335,71 @@ public final class RecordAccumulator {
             incomplete.add(batch);
             // We treat the newly split batches as if they are not even tried.
             synchronized (partitionDequeue) {
-                partitionDequeue.addFirst(batch);
+                if (transactionManager != null) {
+                    // We should track the newly created batches since they already have assigned sequences.
+                    transactionManager.addInFlightBatch(batch);
+                    insertInSequenceOrder(partitionDequeue, batch);
+                } else {
+                    partitionDequeue.addFirst(batch);
+                }
             }
         }
         return numSplitBatches;
     }
 
+    // The deque for the partition may have to be reordered in situations where leadership changes in between
+    // batch drains. Since the requests are on different connections, we no longer have any guarantees about ordering
+    // of the responses. Hence we will have to check if there is anything out of order and ensure the batch is queued
+    // in the correct sequence order.
+    //
+    // Note that this assumes that all the batches in the queue which have an assigned sequence also have the current
+    // producer id. We will not attempt to reorder messages if the producer id has changed.
+
+    private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) {
+        // When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence.
+        if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
+            throw new IllegalStateException("Trying to reenqueue a batch which doesn't have a sequence even " +
+                    "though idempotence is enabled.");
+
+        if (transactionManager.nextBatchBySequence(batch.topicPartition) == null)
+            throw new IllegalStateException("We are reenqueueing a batch which is not tracked as part of the in flight " +
+                    "requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence());
+
+        // If there are no inflight batches being tracked by the transaction manager, it means that the producer
+        // id must have changed and the batches being re enqueued are from the old producer id. In this case
+        // we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence,
+        // or they will succeed.
+        if (batch.baseSequence() != transactionManager.nextBatchBySequence(batch.topicPartition).baseSequence()) {
+            // The incoming batch can't be inserted at the front of the queue without violating the sequence ordering.
+            // This means that the incoming batch should be placed somewhere further back.
+            // We need to find the right place for the incoming batch and insert it there.
+            // We will only enter this branch if we have multiple inflights sent to different brokers, perhaps
+            // because a leadership change occurred in between the drains. In this scenario, responses can come
+            // back out of order, requiring us to re order the batches ourselves rather than relying on the
+            // implicit ordering guarantees of the network client which are only on a per connection basis.
+
+            List<ProducerBatch> orderedBatches = new ArrayList<>();
+            while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence())
+                orderedBatches.add(deque.pollFirst());
+
+            log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " +
+                    "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size());
+            // Either we have reached a point where there are batches without a sequence (ie. never been drained
+            // and are hence in order by default), or the batch at the front of the queue has a sequence greater
+            // than the incoming batch. This is the right place to add the incoming batch.
+            deque.addFirst(batch);
+
+            // Now we have to re insert the previously queued batches in the right order.
+            for (int i = orderedBatches.size() - 1; i >= 0; --i) {
+                deque.addFirst(orderedBatches.get(i));
+            }
+
+            // At this point, the incoming batch has been queued in the correct place according to its sequence.
+        } else {
+            deque.addFirst(batch);
+        }
+    }
+
     /**
      * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
      * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
@@ -470,20 +532,42 @@ public final class RecordAccumulator {
                                                 break;
 
                                             isTransactional = transactionManager.isTransactional();
+
+                                            if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))
+                                                // Don't drain any new batches while the state of previous sequence numbers
+                                                // is unknown. The previous batches would be unknown if they were aborted
+                                                // on the client after being sent to the broker at least once.
+                                                break;
+
+                                            if (first.hasSequence()
+                                                    && first.baseSequence() != transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
+                                                // If the queued batch already has an assigned sequence, then it is being
+                                                // retried. In this case, we wait until the next immediate batch is ready
+                                                // and drain that. We only move on when the next in line batch is complete (either successfully
+                                                // or due to a fatal broker error). This effectively reduces our
+                                                // in flight request count to 1.
+                                                break;
                                         }
 
                                         ProducerBatch batch = deque.pollFirst();
-                                        if (producerIdAndEpoch != null && !batch.inRetry()) {
-                                            // If the batch is in retry, then we should not change the producer id and
+                                        if (producerIdAndEpoch != null && !batch.hasSequence()) {
+                                            // If the batch already has an assigned sequence, then we should not change the producer id and
                                             // sequence number, since this may introduce duplicates. In particular,
                                             // the previous attempt may actually have been accepted, and if we change
                                             // the producer id and sequence here, this attempt will also be accepted,
                                             // causing a duplicate.
-                                            int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
-                                            log.debug("Assigning sequence number {} from producer {} to dequeued " +
-                                                            "batch from partition {} bound for {}.",
-                                                    sequenceNumber, producerIdAndEpoch, batch.topicPartition, node);
-                                            batch.setProducerState(producerIdAndEpoch, sequenceNumber, isTransactional);
+                                            //
+                                            // Additionally, we update the next sequence number bound for the partition,
+                                            // and also have the transaction manager track the batch so as to ensure
+                                            // that sequence ordering is maintained even if we receive out of order
+                                            // responses.
+                                            batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+                                            transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
+                                            log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
+                                                            "{} being sent to partition {}", producerIdAndEpoch.producerId,
+                                                    producerIdAndEpoch.epoch, batch.baseSequence(), tp);
+
+                                            transactionManager.addInFlightBatch(batch);
                                         }
                                         batch.close();
                                         size += batch.records().sizeInBytes();
@@ -635,7 +719,7 @@ public final class RecordAccumulator {
             Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
             boolean aborted = false;
             synchronized (dq) {
-                if (!batch.isClosed()) {
+                if ((transactionManager != null && !batch.hasSequence()) || (transactionManager == null && !batch.isClosed())) {
                     aborted = true;
                     batch.abortRecordAppends();
                     dq.remove(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 466bdd5..ecfad70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -200,10 +200,17 @@ public class Sender implements Runnable {
      */
     void run(long now) {
         if (transactionManager != null) {
+            if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
+                // Check if the previous run expired batches which requires a reset of the producer state.
+                transactionManager.resetProducerId();
+
             if (!transactionManager.isTransactional()) {
                 // this is an idempotent producer, so make sure we have a producer id
                 maybeWaitForProducerId();
-            } else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {
+            } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
+                transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
+                        "some previously sent messages and can no longer retry them. It isn't safe to continue."));
+            } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                 // as long as there are outstanding transactional requests, we simply wait for them to return
                 client.poll(retryBackoffMs, now);
                 return;
@@ -228,6 +235,7 @@ public class Sender implements Runnable {
 
     private long sendProducerData(long now) {
         Cluster cluster = metadata.fetch();
+
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
@@ -264,23 +272,17 @@ public class Sender implements Runnable {
         }
 
         List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
-        boolean needsTransactionStateReset = false;
         // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
         // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
         // we need to reset the producer id here.
         if (!expiredBatches.isEmpty())
             log.trace("Expired {} batches in accumulator", expiredBatches.size());
         for (ProducerBatch expiredBatch : expiredBatches) {
-            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
+            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
             if (transactionManager != null && expiredBatch.inRetry()) {
-                needsTransactionStateReset = true;
+                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
+                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
             }
-            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
-        }
-
-        if (needsTransactionStateReset) {
-            transactionManager.resetProducerId();
-            return 0;
         }
 
         sensors.updateProduceRequestMetrics(batches);
@@ -345,7 +347,7 @@ public class Sender implements Runnable {
 
                     ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
                             requestBuilder, now, true, nextRequestHandler);
-                    transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
+                    transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId());
                     log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
 
                     client.send(clientRequest, now);
@@ -422,6 +424,7 @@ public class Sender implements Runnable {
                         ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
                                 initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
                         transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
+                        return;
                     } else if (error.exception() instanceof RetriableException) {
                         log.debug("Retriable error from InitProducerId response", error.message());
                     } else {
@@ -492,6 +495,7 @@ public class Sender implements Runnable {
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                long now) {
         Errors error = response.error;
+
         if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
                 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
             // If the batch is too large, we split the batch and send the split batches again. We do not decrement
@@ -501,6 +505,8 @@ public class Sender implements Runnable {
                      batch.topicPartition,
                      this.retries - batch.attempts(),
                      error);
+            if (transactionManager != null)
+                transactionManager.removeInFlightBatch(batch);
             this.accumulator.splitAndReenqueue(batch);
             this.accumulator.deallocate(batch);
             this.sensors.recordBatchSplit();
@@ -517,14 +523,20 @@ public class Sender implements Runnable {
                     // If idempotence is enabled only retry the request if the current producer id is the same as
                     // the producer id of the batch.
                     log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
-                            transactionManager.sequenceNumber(batch.topicPartition));
+                            batch.baseSequence());
                     reenqueueBatch(batch, now);
                 } else {
                     failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
                             "batch but the producer id changed from " + batch.producerId() + " to " +
-                            transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."));
-                    this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+                            transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
                 }
+            } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
+                // If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
+                // the sequence of the current batch, and we haven't retained batch metadata on the broker to return
+                // the correct offset and timestamp.
+                //
+                // The only thing we can do is to return success to the user and not return a valid offset and timestamp.
+                completeBatch(batch, response);
             } else {
                 final RuntimeException exception;
                 if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
@@ -533,9 +545,10 @@ public class Sender implements Runnable {
                     exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
                 else
                     exception = error.exception();
-                // tell the user the result of their request
-                failBatch(batch, response, exception);
-                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+                // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
+                // its retries -- if it did, we don't know whether the sequence number was accepted or not, and
+                // thus it is not safe to reassign the sequence.
+                failBatch(batch, response, exception, batch.attempts() < this.retries);
             }
             if (error.exception() instanceof InvalidMetadataException) {
                 if (error.exception() instanceof UnknownTopicOrPartitionException)
@@ -559,21 +572,24 @@ public class Sender implements Runnable {
     }
 
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
-        if (transactionManager != null && transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
-            transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
-            log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
-                    transactionManager.sequenceNumber(batch.topicPartition));
+        if (transactionManager != null) {
+            if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
+                transactionManager.maybeUpdateLastAckedSequence(batch.topicPartition, batch.baseSequence() + batch.recordCount - 1);
+                log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", batch.producerId(), batch.topicPartition,
+                        transactionManager.lastAckedSequence(batch.topicPartition));
+            }
+            transactionManager.removeInFlightBatch(batch);
         }
 
         batch.done(response.baseOffset, response.logAppendTime, null);
         this.accumulator.deallocate(batch);
     }
 
-    private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
-        failBatch(batch, response.baseOffset, response.logAppendTime, exception);
+    private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, boolean adjustSequenceNumbers) {
+        failBatch(batch, response.baseOffset, response.logAppendTime, exception, adjustSequenceNumbers);
     }
 
-    private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception) {
+    private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) {
         if (transactionManager != null) {
             if (exception instanceof OutOfOrderSequenceException
                     && !transactionManager.isTransactional()
@@ -594,16 +610,26 @@ public class Sender implements Runnable {
             } else if (transactionManager.isTransactional()) {
                 transactionManager.transitionToAbortableError(exception);
             }
+            transactionManager.removeInFlightBatch(batch);
+            if (adjustSequenceNumbers)
+                transactionManager.adjustSequencesDueToFailedBatch(batch);
         }
+
+        this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
         batch.done(baseOffset, logAppendTime, exception);
         this.accumulator.deallocate(batch);
     }
 
     /**
-     * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
+     * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed.
+     * We can also retry OutOfOrderSequence exceptions for future batches, since if the first batch has failed, the future
+     * batches are certain to fail with an OutOfOrderSequence exception.
      */
     private boolean canRetry(ProducerBatch batch, Errors error) {
-        return batch.attempts() < this.retries && error.exception() instanceof RetriableException;
+        return batch.attempts() < this.retries &&
+                ((error.exception() instanceof RetriableException) ||
+                        (error.exception() instanceof OutOfOrderSequenceException
+                                && transactionManager.canRetryOutOfOrderSequenceException(batch)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 05d943c..b2387a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -65,7 +65,27 @@ public class TransactionManager {
     private final String transactionalId;
     private final int transactionTimeoutMs;
 
-    private final Map<TopicPartition, Integer> sequenceNumbers;
+    // The base sequence of the next batch bound for a given partition.
+    private final Map<TopicPartition, Integer> nextSequence;
+
+    // The sequence of the last record of the last ack'd batch from the given partition. When there are no
+    // in flight requests for a partition, the lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
+    private final Map<TopicPartition, Integer> lastAckedSequence;
+
+    // If a batch bound for a partition expired locally after being sent at least once, the partition has is considered
+    // to have an unresolved state. We keep track fo such partitions here, and cannot assign any more sequence numbers
+    // for this partition until the unresolved state gets cleared. This may happen if other inflight batches returned
+    // successfully (indicating that the expired batch actually made it to the broker). If we don't get any successful
+    // responses for the partition once the inflight request count falls to zero, we reset the producer id and
+    // consequently clear this data structure as well.
+    private final Set<TopicPartition> partitionsWithUnresolvedSequences;
+
+    // Keep track of the in flight batches bound for a partition, ordered by sequence. This helps us to ensure that
+    // we continue to order batches by the sequence numbers even when the responses come back out of order during
+    // leader failover. We add a batch to the queue when it is drained, and remove it when the batch completes
+    // (either successfully or through a fatal failure).
+    private final Map<TopicPartition, PriorityQueue<ProducerBatch>> inflightBatchesBySequence;
+
     private final PriorityQueue<TxnRequestHandler> pendingRequests;
     private final Set<TopicPartition> newPartitionsInTransaction;
     private final Set<TopicPartition> pendingPartitionsInTransaction;
@@ -142,7 +162,8 @@ public class TransactionManager {
 
     public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
         this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
-        this.sequenceNumbers = new HashMap<>();
+        this.nextSequence = new HashMap<>();
+        this.lastAckedSequence = new HashMap<>();
         this.transactionalId = transactionalId;
         this.log = logContext.logger(TransactionManager.class);
         this.transactionTimeoutMs = transactionTimeoutMs;
@@ -159,6 +180,9 @@ public class TransactionManager {
             }
         });
 
+        this.partitionsWithUnresolvedSequences = new HashSet<>();
+        this.inflightBatchesBySequence = new HashMap<>();
+
         this.retryBackoffMs = retryBackoffMs;
     }
 
@@ -170,7 +194,7 @@ public class TransactionManager {
         ensureTransactional();
         transitionTo(State.INITIALIZING);
         setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
-        this.sequenceNumbers.clear();
+        this.nextSequence.clear();
         InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
         InitProducerIdHandler handler = new InitProducerIdHandler(builder);
         enqueueRequest(handler);
@@ -362,28 +386,157 @@ public class TransactionManager {
             throw new IllegalStateException("Cannot reset producer state for a transactional producer. " +
                     "You must either abort the ongoing transaction or reinitialize the transactional producer instead");
         setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
-        this.sequenceNumbers.clear();
+        this.nextSequence.clear();
+        this.lastAckedSequence.clear();
+        this.inflightBatchesBySequence.clear();
+        this.partitionsWithUnresolvedSequences.clear();
     }
 
     /**
      * Returns the next sequence number to be written to the given TopicPartition.
      */
     synchronized Integer sequenceNumber(TopicPartition topicPartition) {
-        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
+        Integer currentSequenceNumber = nextSequence.get(topicPartition);
         if (currentSequenceNumber == null) {
             currentSequenceNumber = 0;
-            sequenceNumbers.put(topicPartition, currentSequenceNumber);
+            nextSequence.put(topicPartition, currentSequenceNumber);
         }
         return currentSequenceNumber;
     }
 
     synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
-        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
+        Integer currentSequenceNumber = nextSequence.get(topicPartition);
         if (currentSequenceNumber == null)
             throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");
 
         currentSequenceNumber += increment;
-        sequenceNumbers.put(topicPartition, currentSequenceNumber);
+        nextSequence.put(topicPartition, currentSequenceNumber);
+    }
+
+    synchronized void addInFlightBatch(ProducerBatch batch) {
+        if (!batch.hasSequence())
+            throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set.");
+        if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
+            inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
+                @Override
+                public int compare(ProducerBatch o1, ProducerBatch o2) {
+                    return o1.baseSequence() - o2.baseSequence();
+                }
+            }));
+        }
+        inflightBatchesBySequence.get(batch.topicPartition).offer(batch);
+    }
+
+
+    synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
+        PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(topicPartition);
+        if (queue == null)
+            return null;
+        return queue.peek();
+    }
+
+    synchronized void removeInFlightBatch(ProducerBatch batch) {
+        PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(batch.topicPartition);
+        if (queue == null)
+            return;
+        queue.remove(batch);
+    }
+
+    synchronized void maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) {
+        if (sequence > lastAckedSequence(topicPartition))
+            lastAckedSequence.put(topicPartition, sequence);
+    }
+
+    synchronized int lastAckedSequence(TopicPartition topicPartition) {
+        Integer currentLastAckedSequence = lastAckedSequence.get(topicPartition);
+        if (currentLastAckedSequence == null)
+            return -1;
+        return currentLastAckedSequence;
+    }
+
+    // If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted
+    // so that they don't fail with the OutOfOrderSequenceException.
+    //
+    // This method must only be called when we know that the batch is question has been unequivocally failed by the broker,
+    // ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar.
+    synchronized void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
+        if (!this.nextSequence.containsKey(batch.topicPartition))
+            // Sequence numbers are not being tracked for this partition. This could happen if the producer id was just
+            // reset due to a previous OutOfOrderSequenceException.
+            return;
+        log.debug("producerId: {}, send to partition {} failed fatally. Reducing future sequence numbers by {}",
+                batch.producerId(), batch.topicPartition, batch.recordCount);
+        int currentSequence = sequenceNumber(batch.topicPartition);
+        currentSequence -= batch.recordCount;
+        if (currentSequence < 0)
+            throw new IllegalStateException("Sequence number for partition " + batch.topicPartition + " is going to become negative : " + currentSequence);
+
+        setNextSequence(batch.topicPartition, currentSequence);
+
+        for (ProducerBatch inFlightBatch : inflightBatchesBySequence.get(batch.topicPartition)) {
+            if (inFlightBatch.baseSequence() < batch.baseSequence())
+                continue;
+            int newSequence = inFlightBatch.baseSequence() - batch.recordCount;
+            if (newSequence < 0)
+                throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence()
+                        + " for partition " + batch.topicPartition + " is going to become negative :" + newSequence);
+
+            log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", inFlightBatch.baseSequence(), batch.topicPartition, newSequence);
+            inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence, inFlightBatch.isTransactional());
+        }
+    }
+
+    synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
+        return inflightBatchesBySequence.containsKey(topicPartition) && !inflightBatchesBySequence.get(topicPartition).isEmpty();
+    }
+
+    synchronized boolean hasUnresolvedSequences() {
+        return !partitionsWithUnresolvedSequences.isEmpty();
+    }
+
+    synchronized boolean hasUnresolvedSequence(TopicPartition topicPartition) {
+        return partitionsWithUnresolvedSequences.contains(topicPartition);
+    }
+
+    synchronized void markSequenceUnresolved(TopicPartition topicPartition) {
+        log.debug("Marking partition {} unresolved", topicPartition);
+        partitionsWithUnresolvedSequences.add(topicPartition);
+    }
+
+    // Checks if there are any partitions with unresolved partitions which may now be resolved. Returns true if
+    // the producer id needs a reset, false otherwise.
+    synchronized boolean shouldResetProducerStateAfterResolvingSequences() {
+        if (isTransactional())
+            // We should not reset producer state if we are transactional. We will transition to a fatal error instead.
+            return false;
+        for (TopicPartition topicPartition : partitionsWithUnresolvedSequences) {
+            if (!hasInflightBatches(topicPartition)) {
+                // The partition has been fully drained. At this point, the last ack'd sequence should be once less than
+                // next sequence destined for the partition. If so, the partition is fully resolved. If not, we should
+                // reset the sequence number if necessary.
+                if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) {
+                    // This would happen when a batch was expired, but subsequent batches succeeded.
+                    partitionsWithUnresolvedSequences.remove(topicPartition);
+                } else {
+                    // We would enter this branch if all in flight batches were ultimately expired in the producer.
+                    log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
+                            "Going to reset producer state.", topicPartition, lastAckedSequence(topicPartition), sequenceNumber(topicPartition));
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    synchronized boolean isNextSequence(TopicPartition topicPartition, int sequence) {
+        return sequence - lastAckedSequence(topicPartition) == 1;
+    }
+
+    private synchronized void setNextSequence(TopicPartition topicPartition, int sequence) {
+        if (!nextSequence.containsKey(topicPartition) && sequence != 0)
+            throw new IllegalStateException("Trying to set the sequence number for " + topicPartition + " to " + sequence +
+            ", but the sequence number was never set for this partition.");
+        nextSequence.put(topicPartition, sequence);
     }
 
     synchronized TxnRequestHandler nextRequestHandler(boolean hasIncompleteBatches) {
@@ -441,15 +594,15 @@ public class TransactionManager {
         lookupCoordinator(request.coordinatorType(), request.coordinatorKey());
     }
 
-    void setInFlightRequestCorrelationId(int correlationId) {
+    void setInFlightTransactionalRequestCorrelationId(int correlationId) {
         inFlightRequestCorrelationId = correlationId;
     }
 
-    void clearInFlightRequestCorrelationId() {
+    void clearInFlightTransactionalRequestCorrelationId() {
         inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
-    boolean hasInFlightRequest() {
+    boolean hasInFlightTransactionalRequest() {
         return inFlightRequestCorrelationId != NO_INFLIGHT_REQUEST_CORRELATION_ID;
     }
 
@@ -479,6 +632,11 @@ public class TransactionManager {
         return currentState == State.IN_TRANSACTION || isCompleting() || hasAbortableError();
     }
 
+    synchronized boolean canRetryOutOfOrderSequenceException(ProducerBatch batch) {
+        return hasProducerId(batch.producerId()) && !hasUnresolvedSequence(batch.topicPartition) &&
+                (batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()));
+    }
+
     // visible for testing
     synchronized boolean isReady() {
         return isTransactional() && currentState == State.READY;
@@ -629,7 +787,7 @@ public class TransactionManager {
             if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
                 fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
             } else {
-                clearInFlightRequestCorrelationId();
+                clearInFlightTransactionalRequestCorrelationId();
                 if (response.wasDisconnected()) {
                     log.debug("Disconnected from {}. Will retry.", response.destination());
                     if (this.needsCoordinator())

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceException.java b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceException.java
new file mode 100644
index 0000000..11f81af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class DuplicateSequenceException extends ApiException {
+
+    public DuplicateSequenceException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
deleted file mode 100644
index 469ba98..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-public class DuplicateSequenceNumberException extends RetriableException {
-
-    public DuplicateSequenceNumberException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 9decef2..bbc3486 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
-import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
+import org.apache.kafka.common.errors.DuplicateSequenceException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
@@ -432,7 +432,7 @@ public enum Errors {
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {
-                return new DuplicateSequenceNumberException(message);
+                return new DuplicateSequenceException(message);
             }
         }),
     INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch. Either there is a newer producer " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 19d25d7..fc83134 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -279,6 +279,17 @@ public class MemoryRecordsBuilder {
         aborted = true;
     }
 
+    public void reopenAndRewriteProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
+        if (aborted)
+            throw new IllegalStateException("Should not reopen a batch which is already aborted.");
+        builtRecords = null;
+        this.producerId = producerId;
+        this.producerEpoch = producerEpoch;
+        this.baseSequence = baseSequence;
+        this.isTransactional = isTransactional;
+    }
+
+
     public void close() {
         if (aborted)
             throw new IllegalStateException("Cannot close MemoryRecordsBuilder as it has already been aborted");
@@ -766,4 +777,7 @@ public class MemoryRecordsBuilder {
         return this.producerEpoch;
     }
 
+    public int baseSequence() {
+        return this.baseSequence;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 9960cce..71e32ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -224,6 +224,16 @@ public class MockClient implements KafkaClient {
         respond(response);
     }
 
+    // Utility method to enable out of order responses
+    public void respondToRequest(ClientRequest clientRequest, AbstractResponse response) {
+        AbstractRequest request = clientRequest.requestBuilder().build();
+        requests.remove(clientRequest);
+        short version = clientRequest.requestBuilder().desiredOrLatestVersion();
+        responses.add(new ClientResponse(clientRequest.makeHeader(version), clientRequest.callback(), clientRequest.destination(),
+                clientRequest.createdTimeMs(), time.milliseconds(), false, null, response));
+    }
+
+
     public void respond(AbstractResponse response, boolean disconnected) {
         ClientRequest request = requests.remove();
         short version = request.requestBuilder().desiredOrLatestVersion();


[2/3] kafka git commit: KAFKA-5494; Enable idempotence with max.in.flight.requests.per.connection > 1

Posted by jg...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 6f98e52..26e3e6c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -66,6 +68,7 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -467,6 +470,657 @@ public class SenderTest {
         assertSendFailure(ClusterAuthorizationException.class);
     }
 
+
+    @Test
+    public void testIdempotenceWithMultipleInflights() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+        assertFalse(request2.isDone());
+
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
+        sender.run(time.milliseconds()); // receive response 1
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(client.hasInFlightRequests());
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+    }
+
+
+    @Test
+    public void testIdempotenceWithMultipleInflightsFirstFails() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.LEADER_NOT_AVAILABLE, -1L);
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
+
+        sender.run(time.milliseconds()); // re send request 0, receive response 1
+
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(1, client.inFlightRequestCount());
+
+        sender.run(time.milliseconds()); // Do nothing, we are reduced to one in flight request during retries.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+        sender.run(time.milliseconds());  // receive response 0
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        assertFalse(request2.isDone());
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+
+        sender.run(time.milliseconds()); // send request 1
+        assertEquals(1, client.inFlightRequestCount());
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
+        sender.run(time.milliseconds());  // receive response 1
+
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+        assertFalse(client.hasInFlightRequests());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+    }
+
+    @Test
+    public void testIdempotenceWithMultipleInflightsWhereFirstFailsFatallyAndSequenceOfFutureBatchesIsAdjusted() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.MESSAGE_TOO_LARGE, -1L);
+
+        sender.run(time.milliseconds()); // receive response 0, should adjust sequences of future batches.
+
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised an error");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof RecordTooLargeException);
+        }
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
+
+        sender.run(time.milliseconds()); // receive response 1
+
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        sender.run(time.milliseconds()); // resend request 1
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+        sender.run(time.milliseconds());  // receive response 1
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        assertTrue(request1.isDone());
+        assertEquals(0, request2.get().offset());
+    }
+
+    @Test
+    public void testMustNotRetryOutOfOrderSequenceForNextBatch() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest with multiple messages.
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+
+        // make sure the next sequence number accounts for multi-message batches.
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0);
+
+        sender.run(time.milliseconds());
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        // This OutOfOrderSequence is fatal since it is returned for the batch succeeding the last acknowledged batch.
+        sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
+
+        sender.run(time.milliseconds());
+        assertTrue(request2.isDone());
+
+        try {
+            request2.get();
+            fail("Expected an OutOfOrderSequenceException");
+        } catch (ExecutionException e) {
+            assert e.getCause() instanceof OutOfOrderSequenceException;
+        }
+    }
+
+    @Test
+    public void testCorrectHandlingOfOutOfOrderResponses() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        ClientRequest firstClientRequest = client.requests().peek();
+        ClientRequest secondClientRequest = (ClientRequest) client.requests().toArray()[1];
+
+        client.respondToRequest(secondClientRequest, produceResponse(tp0, -1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1));
+
+        sender.run(time.milliseconds()); // receive response 1
+        Deque<ProducerBatch> queuedBatches = accumulator.batches().get(tp0);
+
+        // Make sure that we are queueing the second batch first.
+        assertEquals(1, queuedBatches.size());
+        assertEquals(1, queuedBatches.peekFirst().baseSequence());
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1));
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        // Make sure we requeued both batches in the correct order.
+        assertEquals(2, queuedBatches.size());
+        assertEquals(0, queuedBatches.peekFirst().baseSequence());
+        assertEquals(1, queuedBatches.peekLast().baseSequence());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+
+        sender.run(time.milliseconds()); // send request 0
+        assertEquals(1, client.inFlightRequestCount());
+        sender.run(time.milliseconds()); // don't do anything, only one inflight allowed once we are retrying.
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Make sure that the requests are sent in order, even though the previous responses were not in order.
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+        sender.run(time.milliseconds());  // receive response 0
+        assertEquals(0, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+
+        sender.run(time.milliseconds()); // send request 1
+        assertEquals(1, client.inFlightRequestCount());
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
+        sender.run(time.milliseconds());  // receive response 1
+
+        assertFalse(client.hasInFlightRequests());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+    }
+
+    @Test
+    public void testCorrectHandlingOfOutOfOrderResponsesWhenSecondSucceeds() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        ClientRequest firstClientRequest = client.requests().peek();
+        ClientRequest secondClientRequest = (ClientRequest) client.requests().toArray()[1];
+
+        client.respondToRequest(secondClientRequest, produceResponse(tp0, 1, Errors.NONE, 1));
+
+        sender.run(time.milliseconds()); // receive response 1
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+        assertFalse(request1.isDone());
+        Deque<ProducerBatch> queuedBatches = accumulator.batches().get(tp0);
+
+        assertEquals(0, queuedBatches.size());
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+
+        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.REQUEST_TIMED_OUT, -1));
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        // Make sure we requeued both batches in the correct order.
+        assertEquals(1, queuedBatches.size());
+        assertEquals(0, queuedBatches.peekFirst().baseSequence());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        sender.run(time.milliseconds()); // resend request 0
+        assertEquals(1, client.inFlightRequestCount());
+
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+
+        // Make sure we handle the out of order successful responses correctly.
+        sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
+        sender.run(time.milliseconds());  // receive response 0
+        assertEquals(0, queuedBatches.size());
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertEquals(0, client.inFlightRequestCount());
+
+        assertFalse(client.hasInFlightRequests());
+        assertTrue(request1.isDone());
+        assertEquals(0, request1.get().offset());
+    }
+
+    @Test
+    public void testExpiryOfUnsentBatchesShouldNotCauseUnresolvedSequences() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Node node = this.cluster.nodes().get(0);
+        time.sleep(10000L);
+        client.disconnect(node.idString());
+        client.blackout(node, 10);
+
+        sender.run(time.milliseconds());
+
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised timeout exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+        assertFalse(transactionManager.hasUnresolvedSequence(tp0));
+    }
+
+    @Test
+    public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+
+        assertEquals(2, client.inFlightRequestCount());
+
+        sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1);
+        sender.run(time.milliseconds());  // receive first response
+
+        Node node = this.cluster.nodes().get(0);
+        time.sleep(10000L);
+        client.disconnect(node.idString());
+        client.blackout(node, 10);
+
+        sender.run(time.milliseconds()); // now expire the first batch.
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised timeout exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+        assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+        // let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
+        Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+
+        time.sleep(20);
+
+        assertFalse(request2.isDone());
+
+        sender.run(time.milliseconds());  // send second request
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1);
+        sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
+        assertTrue(request2.isDone());
+        assertEquals(1, request2.get().offset());
+        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+
+        assertEquals(1, batches.size());
+        assertFalse(batches.peekFirst().hasSequence());
+        assertFalse(client.hasInFlightRequests());
+        assertEquals(2L, transactionManager.sequenceNumber(tp0).longValue());
+        assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+
+        sender.run(time.milliseconds());  // clear the unresolved state, send the pending request.
+        assertFalse(transactionManager.hasUnresolvedSequence(tp0));
+        assertTrue(transactionManager.hasProducerId());
+        assertEquals(0, batches.size());
+        assertEquals(1, client.inFlightRequestCount());
+        assertFalse(request3.isDone());
+    }
+
+    @Test
+    public void testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+
+        assertEquals(2, client.inFlightRequestCount());
+
+        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+        sender.run(time.milliseconds());  // receive first response
+
+        Node node = this.cluster.nodes().get(0);
+        time.sleep(10000L);
+        client.disconnect(node.idString());
+        client.blackout(node, 10);
+
+        sender.run(time.milliseconds()); // now expire the first batch.
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised timeout exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+        assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+        // let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
+        Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+
+        time.sleep(20);
+
+        assertFalse(request2.isDone());
+
+        sender.run(time.milliseconds());  // send second request
+        sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
+        sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
+        assertTrue(request2.isDone());
+
+        try {
+            request2.get();
+            fail("should have failed with an exception");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
+        }
+
+        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+
+        // The second request should not be requeued.
+        assertEquals(1, batches.size());
+        assertFalse(batches.peekFirst().hasSequence());
+        assertFalse(client.hasInFlightRequests());
+
+        // The producer state should be reset.
+        assertFalse(transactionManager.hasProducerId());
+        assertFalse(transactionManager.hasUnresolvedSequence(tp0));
+    }
+
+    @Test
+    public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // send request
+        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+        sender.run(time.milliseconds());  // receive response
+
+        assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
+
+        Node node = this.cluster.nodes().get(0);
+        time.sleep(10000L);
+        client.disconnect(node.idString());
+        client.blackout(node, 10);
+
+        sender.run(time.milliseconds()); // now expire the batch.
+
+        assertTrue(request1.isDone());
+        try {
+            request1.get();
+            fail("Should have raised timeout exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+        assertTrue(transactionManager.hasUnresolvedSequence(tp0));
+        assertFalse(client.hasInFlightRequests());
+        Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
+        assertEquals(0, batches.size());
+        assertTrue(transactionManager.hasProducerId(producerId));
+        // We should now clear the old producerId and get a new one in a single run loop.
+        prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId(producerId + 1));
+    }
+
+    @Test
+    public void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        String nodeId = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+
+        // Send second ProduceRequest
+        Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(-1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(request1.isDone());
+        assertFalse(request2.isDone());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        ClientRequest firstClientRequest = client.requests().peek();
+        ClientRequest secondClientRequest = (ClientRequest) client.requests().toArray()[1];
+
+        client.respondToRequest(secondClientRequest, produceResponse(tp0, 1, Errors.NONE, -1));
+
+        sender.run(time.milliseconds()); // receive response 1
+
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+
+        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.DUPLICATE_SEQUENCE_NUMBER, -1));
+
+        sender.run(time.milliseconds()); // receive response 0
+
+        // Make sure that the last ack'd sequence doesn't change.
+        assertEquals(1, transactionManager.lastAckedSequence(tp0));
+        assertFalse(client.hasInFlightRequests());
+    }
+
+    void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) {
+        client.respond(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                ProduceRequest produceRequest = (ProduceRequest) body;
+                assertTrue(produceRequest.isIdempotent());
+
+                MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp0);
+                Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
+                RecordBatch firstBatch = batchIterator.next();
+                assertFalse(batchIterator.hasNext());
+                assertEquals(expectedSequence, firstBatch.baseSequence());
+
+                return true;
+            }
+        }, produceResponse(tp, responseOffset, responseError, 0));
+    }
+
     @Test
     public void testClusterAuthorizationExceptionInProduceRequest() throws Exception {
         final long producerId = 343434L;
@@ -604,7 +1258,8 @@ public class SenderTest {
 
         sender.run(time.milliseconds());  // receive response
         assertTrue(responseFuture.isDone());
-        assertEquals((long) transactionManager.sequenceNumber(tp0), 1L);
+        assertEquals(0L, (long) transactionManager.lastAckedSequence(tp0));
+        assertEquals(1L, (long) transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -632,6 +1287,7 @@ public class SenderTest {
         assertEquals(0, client.inFlightRequestCount());
         assertFalse("Client ready status should be false", client.isReady(node, 0L));
 
+        transactionManager.resetProducerId();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
         sender.run(time.milliseconds()); // receive error
         sender.run(time.milliseconds()); // reconnect
@@ -642,7 +1298,7 @@ public class SenderTest {
         assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
 
         assertTrue(responseFuture.isDone());
-        assertEquals((long) transactionManager.sequenceNumber(tp0), 0L);
+        assertEquals(0, (long) transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -720,7 +1376,8 @@ public class SenderTest {
                     accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
-            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue());
+
+            assertEquals("The next sequence should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             String id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.valueOf(id), "localhost", 0);
@@ -731,11 +1388,12 @@ public class SenderTest {
             responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
             client.respond(new ProduceResponse(responseMap));
             sender.run(time.milliseconds()); // split and reenqueue
+            assertEquals("The next sequence should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             // The compression ratio should have been improved once.
             assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
                     CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01);
-            sender.run(time.milliseconds()); // send produce request
-            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue());
+            sender.run(time.milliseconds()); // send the first produce request
+            assertEquals("The next sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
             assertFalse("The future shouldn't have been done.", f1.isDone());
             assertFalse("The future shouldn't have been done.", f2.isDone());
             id = client.requests().peek().destination();
@@ -750,10 +1408,11 @@ public class SenderTest {
 
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f1.isDone());
-            assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp).longValue());
+            assertEquals("The next sequence number should still be 2", 2, txnManager.sequenceNumber(tp).longValue());
+            assertEquals("The last ack'd sequence number should be 0", 0, txnManager.lastAckedSequence(tp));
             assertFalse("The future shouldn't have been done.", f2.isDone());
             assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
-            sender.run(time.milliseconds()); // send produce request
+            sender.run(time.milliseconds()); // send the seconcd produce request
             id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
             node = new Node(Integer.valueOf(id), "localhost", 0);
@@ -766,7 +1425,8 @@ public class SenderTest {
 
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f2.isDone());
-            assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
+            assertEquals("The next sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
+            assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp));
             assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
             assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
 
@@ -828,9 +1488,11 @@ public class SenderTest {
         this.metrics = new Metrics(metricConfig, time);
         this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
                 apiVersions, transactionManager);
+
         this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet());
-        this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
-                MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                Integer.MAX_VALUE, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 53bba1c..28f9c82 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1171,6 +1171,7 @@ public class TransactionManagerTest {
         assertFutureFailed(unauthorizedTopicProduceFuture);
         assertTrue(authorizedTopicProduceFuture.isDone());
         assertNotNull(authorizedTopicProduceFuture.get());
+        assertTrue(authorizedTopicProduceFuture.isDone());
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
         transactionManager.beginAbort();
@@ -1481,14 +1482,14 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // until the produce future returns, we will not send EndTxn
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // now the produce response returns
@@ -1497,14 +1498,14 @@ public class TransactionManagerTest {
         assertTrue(responseFuture.isDone());
         assertFalse(accumulator.hasUndrained());
         assertFalse(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we send EndTxn
         sender.run(time.milliseconds());
-        assertTrue(transactionManager.hasInFlightRequest());
+        assertTrue(transactionManager.hasInFlightTransactionalRequest());
         sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
         sender.run(time.milliseconds());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertTrue(transactionManager.isReady());
     }
 
@@ -1529,21 +1530,21 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we begin the commit with the produce request still pending
         transactionManager.beginCommit();
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // until the produce future returns, we will not send EndTxn
         sender.run(time.milliseconds());
         assertFalse(accumulator.hasUndrained());
         assertTrue(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertFalse(responseFuture.isDone());
 
         // now the produce response returns
@@ -1552,14 +1553,14 @@ public class TransactionManagerTest {
         assertTrue(responseFuture.isDone());
         assertFalse(accumulator.hasUndrained());
         assertFalse(accumulator.hasIncomplete());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
 
         // now we send EndTxn
         sender.run(time.milliseconds());
-        assertTrue(transactionManager.hasInFlightRequest());
+        assertTrue(transactionManager.hasInFlightTransactionalRequest());
         sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
         sender.run(time.milliseconds());
-        assertFalse(transactionManager.hasInFlightRequest());
+        assertFalse(transactionManager.hasInFlightTransactionalRequest());
         assertTrue(transactionManager.isReady());
     }
 
@@ -1960,17 +1961,17 @@ public class TransactionManagerTest {
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
         prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
-        sender.run(time.milliseconds()); // AddPartitions
+        sender.run(time.milliseconds()); // Add partitions
         sender.run(time.milliseconds()); // Produce
 
         assertFalse(responseFuture.isDone());
 
         transactionManager.transitionToAbortableError(new KafkaException());
         prepareProduceResponse(Errors.NONE, pid, epoch);
-
         sender.run(time.milliseconds());
+
         assertTrue(responseFuture.isDone());
-        assertNotNull(responseFuture.get());
+        assertNotNull(responseFuture.get()); // should throw the exception which caused the transaction to be aborted.
     }
 
     @Test
@@ -2151,6 +2152,65 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.transactionContainsPartition(tp0));
     }
 
+    @Test
+    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException, ExecutionException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+
+        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
+        sender.run(time.milliseconds());  // send addPartitions.
+        // Check that only addPartitions was sent.
+        assertTrue(transactionManager.transactionContainsPartition(tp0));
+        assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
+
+        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
+        sender.run(time.milliseconds());  // send the produce request.
+
+        assertFalse(responseFuture.isDone());
+
+        TransactionalRequestResult commitResult = transactionManager.beginCommit();
+
+        // Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
+        time.sleep(10000);
+        // Disconnect the target node for the pending produce request. This will ensure that sender will try to
+        // expire the batch.
+        Node clusterNode = this.cluster.nodes().get(0);
+        client.disconnect(clusterNode.idString());
+        client.blackout(clusterNode, 100);
+
+        sender.run(time.milliseconds());  // We should try to flush the produce, but expire it instead without sending anything.
+        assertTrue(responseFuture.isDone());
+
+        try {
+            // make sure the produce was expired.
+            responseFuture.get();
+            fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof  TimeoutException);
+        }
+        sender.run(time.milliseconds());  // Transition to fatal error since we have unresolved batches.
+        sender.run(time.milliseconds());  // Fail the queued transactional requests
+
+        assertTrue(commitResult.isCompleted());
+        assertFalse(commitResult.isSuccessful());  // the commit should have been dropped.
+
+        assertTrue(transactionManager.hasFatalError());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
         final long pid = 1L;
         final short epoch = 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d3de24d..d98f443 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -514,8 +514,10 @@ class Log(@volatile var dir: File,
     completedTxns.foreach(producerStateManager.completeTxn)
   }
 
-  private[log] def activeProducers: Map[Long, ProducerIdEntry] = lock synchronized {
-    producerStateManager.activeProducers
+  private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized {
+    producerStateManager.activeProducers.map { case (producerId, producerIdEntry) =>
+      (producerId, producerIdEntry.lastSeq)
+    }
   }
 
   /**
@@ -765,17 +767,18 @@ class Log(@volatile var dir: File,
   }
 
   private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
-  (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[ProducerIdEntry]) = {
+  (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
     val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
     val completedTxns = ListBuffer.empty[CompletedTxn]
     for (batch <- records.batches.asScala if batch.hasProducerId) {
       val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
 
-      // if this is a client produce request, there will be only one batch. If that batch matches
-      // the last appended entry for that producer, then this request is a duplicate and we return
-      // the last appended entry to the client.
-      if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch)))
-        return (updatedProducers, completedTxns.toList, maybeLastEntry)
+      // if this is a client produce request, there will be upto 5 batches which could have been duplicated.
+      // If we find a duplicate, we return the metadata of the appended batch to the client.
+      if (isFromClient)
+        maybeLastEntry.flatMap(_.duplicateOf(batch)).foreach { duplicate =>
+          return (updatedProducers, completedTxns.toList, Some(duplicate))
+        }
 
       val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false)
       maybeCompletedTxn.foreach(completedTxns += _)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4f53b41..61dd0fc 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -450,7 +450,7 @@ private[log] class Cleaner(val id: Int,
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
           .format(startOffset, log.name, new Date(oldSegmentOpt.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
         cleanInto(log.topicPartition, oldSegmentOpt, cleaned, map, retainDeletes, log.config.maxMessageSize, transactionMetadata,
-          log.activeProducers, stats)
+          log.activeProducersWithLastSequence, stats)
 
         currentSegmentOpt = nextSegmentOpt
       }
@@ -503,7 +503,7 @@ private[log] class Cleaner(val id: Int,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
-                             activeProducers: Map[Long, ProducerIdEntry],
+                             activeProducers: Map[Long, Int],
                              stats: CleanerStats) {
     val logCleanerFilter = new RecordFilter {
       var discardBatchRecords: Boolean = _
@@ -515,7 +515,7 @@ private[log] class Cleaner(val id: Int,
 
         // check if the batch contains the last sequence number for the producer. if so, we cannot
         // remove the batch just yet or the producer may see an out of sequence error.
-        if (batch.hasProducerId && activeProducers.get(batch.producerId).exists(_.lastSeq == batch.lastSequence))
+        if (batch.hasProducerId && activeProducers.get(batch.producerId).contains(batch.lastSequence))
           BatchRetention.RETAIN_EMPTY
         else if (discardBatchRecords)
           BatchRetention.DELETE

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index fc2e340..4c3d1a1 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -17,7 +17,7 @@
 package kafka.log
 
 import java.io._
-import java.nio.ByteBuffer
+import java.nio.{BufferUnderflowException, ByteBuffer}
 import java.nio.file.Files
 
 import kafka.common.KafkaException
@@ -48,33 +48,82 @@ private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffset
 }
 
 private[log] object ProducerIdEntry {
-  val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-    -1, 0, RecordBatch.NO_TIMESTAMP, -1, None)
+  private[log] val NumBatchesToRetain = 5
+  def empty(producerId: Long) = new ProducerIdEntry(producerId, mutable.Queue[BatchMetadata](), RecordBatch.NO_PRODUCER_EPOCH, -1, None)
 }
 
-private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, lastSeq: Int, lastOffset: Long,
-                                        offsetDelta: Int, timestamp: Long, coordinatorEpoch: Int,
-                                        currentTxnFirstOffset: Option[Long]) {
-  def firstSeq: Int = lastSeq - offsetDelta
-  def firstOffset: Long = lastOffset - offsetDelta
+private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) {
+  def firstSeq = lastSeq - offsetDelta
+  def firstOffset = lastOffset - offsetDelta
 
-  def isDuplicate(batch: RecordBatch): Boolean = {
-    batch.producerEpoch == producerEpoch &&
-      batch.baseSequence == firstSeq &&
-      batch.lastSequence == lastSeq
+  override def toString: String = {
+    "BatchMetadata(" +
+      s"firstSeq=$firstSeq, " +
+      s"lastSeq=$lastSeq, " +
+      s"firstOffset=$firstOffset, " +
+      s"lastOffset=$lastOffset, " +
+      s"timestamp=$timestamp)"
+  }
+}
+
+// the batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the
+// batch with the highest sequence is at the tail of the queue. We will retain at most ProducerIdEntry.NumBatchesToRetain
+// elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch.
+private[log] class ProducerIdEntry(val producerId: Long, val batchMetadata: mutable.Queue[BatchMetadata],
+                                   var producerEpoch: Short, var coordinatorEpoch: Int,
+                                   var currentTxnFirstOffset: Option[Long]) {
+
+  def firstSeq: Int = if (batchMetadata.isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.front.firstSeq
+  def firstOffset: Long = if (batchMetadata.isEmpty) -1L else batchMetadata.front.firstOffset
+
+  def lastSeq: Int = if (batchMetadata.isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.last.lastSeq
+  def lastDataOffset: Long = if (batchMetadata.isEmpty) -1L else batchMetadata.last.lastOffset
+  def lastTimestamp = if (batchMetadata.isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp
+  def lastOffsetDelta : Int = if (batchMetadata.isEmpty) 0 else batchMetadata.last.offsetDelta
+
+  def addBatchMetadata(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) = {
+    maybeUpdateEpoch(producerEpoch)
+
+    if (batchMetadata.size == ProducerIdEntry.NumBatchesToRetain)
+      batchMetadata.dequeue()
+
+    batchMetadata.enqueue(BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp))
+  }
+
+  def maybeUpdateEpoch(producerEpoch: Short): Boolean = {
+    if (this.producerEpoch != producerEpoch) {
+      batchMetadata.clear()
+      this.producerEpoch = producerEpoch
+      true
+    } else {
+      false
+    }
+  }
+
+  def removeBatchesOlderThan(offset: Long) = batchMetadata.dropWhile(_.lastOffset < offset)
+
+  def duplicateOf(batch: RecordBatch): Option[BatchMetadata] = {
+    if (batch.producerEpoch() != producerEpoch)
+       None
+    else
+      batchWithSequenceRange(batch.baseSequence(), batch.lastSequence())
+  }
+
+  // Return the batch metadata of the cached batch having the exact sequence range, if any.
+  def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = {
+    val duplicate = batchMetadata.filter { case(metadata) =>
+      firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
+    }
+    duplicate.headOption
   }
 
   override def toString: String = {
     "ProducerIdEntry(" +
       s"producerId=$producerId, " +
       s"producerEpoch=$producerEpoch, " +
-      s"firstSequence=$firstSeq, " +
-      s"lastSequence=$lastSeq, " +
-      s"firstOffset=$firstOffset, " +
-      s"lastOffset=$lastOffset, " +
-      s"timestamp=$timestamp, " +
       s"currentTxnFirstOffset=$currentTxnFirstOffset, " +
-      s"coordinatorEpoch=$coordinatorEpoch)"
+      s"coordinatorEpoch=$coordinatorEpoch, " +
+      s"batchMetadata=$batchMetadata"
   }
 }
 
@@ -85,8 +134,10 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short,
  * as the incoming records are validated.
  *
  * @param producerId The id of the producer appending to the log
- * @param initialEntry The last entry associated with the producer id. Validation of the first append will be
- *                     based off of this entry initially
+ * @param currentEntry  The current entry associated with the producer id which contains metadata for a fixed number of
+ *                      the most recent appends made by the producer. Validation of the first incoming append will
+ *                      be made against the lastest append in the current entry. New appends will replace older appends
+ *                      in the current entry so that the space overhead is constant.
  * @param validateSequenceNumbers Whether or not sequence numbers should be validated. The only current use
  *                                of this is the consumer offsets topic which uses producer ids from incoming
  *                                TxnOffsetCommit, but has no sequence number to validate and does not depend
@@ -98,48 +149,46 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short,
  *                       retention enforcement.
  */
 private[log] class ProducerAppendInfo(val producerId: Long,
-                                      initialEntry: ProducerIdEntry,
+                                      currentEntry: ProducerIdEntry,
                                       validateSequenceNumbers: Boolean,
                                       loadingFromLog: Boolean) {
-  private var producerEpoch = initialEntry.producerEpoch
-  private var firstSeq = initialEntry.firstSeq
-  private var lastSeq = initialEntry.lastSeq
-  private var lastOffset = initialEntry.lastOffset
-  private var maxTimestamp = initialEntry.timestamp
-  private var currentTxnFirstOffset = initialEntry.currentTxnFirstOffset
-  private var coordinatorEpoch = initialEntry.coordinatorEpoch
+
   private val transactions = ListBuffer.empty[TxnMetadata]
 
   private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = {
     if (isFenced(producerEpoch)) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
-        s"with a newer epoch. $producerEpoch (request epoch), ${this.producerEpoch} (server epoch)")
+        s"with a newer epoch. $producerEpoch (request epoch), ${currentEntry.producerEpoch} (server epoch)")
     } else if (validateSequenceNumbers) {
-      if (producerEpoch != this.producerEpoch) {
+      if (producerEpoch != currentEntry.producerEpoch) {
         if (firstSeq != 0)
           throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
             s"(request epoch), $firstSeq (seq. number)")
-      } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
+      } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
         // the epoch was bumped by a control record, so we expect the sequence number to be reset
         throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " +
           s"(incoming seq. number), but expected 0")
-      } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
-        throw new DuplicateSequenceNumberException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
-          s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, lastEntry.lastSeq): " +
-          s"(${this.firstSeq}, ${this.lastSeq}).")
+      } else if (isDuplicate(firstSeq, lastSeq)) {
+        throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
+          s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).")
       } else if (!inSequence(firstSeq, lastSeq)) {
         throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " +
-          s"(incoming seq. number), ${this.lastSeq} (current end sequence number)")
+          s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)")
       }
     }
   }
 
+  private def isDuplicate(firstSeq: Int, lastSeq: Int): Boolean = {
+    ((lastSeq != 0 && currentEntry.firstSeq != Int.MaxValue && lastSeq < currentEntry.firstSeq)
+      || currentEntry.batchWithSequenceRange(firstSeq, lastSeq).isDefined)
+  }
+
   private def inSequence(firstSeq: Int, lastSeq: Int): Boolean = {
-    firstSeq == this.lastSeq + 1L || (firstSeq == 0 && this.lastSeq == Int.MaxValue)
+    firstSeq == currentEntry.lastSeq + 1L || (firstSeq == 0 && currentEntry.lastSeq == Int.MaxValue)
   }
 
   private def isFenced(producerEpoch: Short): Boolean = {
-    producerEpoch < this.producerEpoch
+    producerEpoch < currentEntry.producerEpoch
   }
 
   def append(batch: RecordBatch): Option[CompletedTxn] = {
@@ -166,18 +215,14 @@ private[log] class ProducerAppendInfo(val producerId: Long,
       // will generally have removed the beginning entries from each producer id
       validateAppend(epoch, firstSeq, lastSeq)
 
-    this.producerEpoch = epoch
-    this.firstSeq = firstSeq
-    this.lastSeq = lastSeq
-    this.maxTimestamp = lastTimestamp
-    this.lastOffset = lastOffset
+    currentEntry.addBatchMetadata(epoch, lastSeq, lastOffset, lastSeq - firstSeq, lastTimestamp)
 
-    if (currentTxnFirstOffset.isDefined && !isTransactional)
+    if (currentEntry.currentTxnFirstOffset.isDefined && !isTransactional)
       throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId")
 
-    if (isTransactional && currentTxnFirstOffset.isEmpty) {
+    if (isTransactional && currentEntry.currentTxnFirstOffset.isEmpty) {
       val firstOffset = lastOffset - (lastSeq - firstSeq)
-      currentTxnFirstOffset = Some(firstOffset)
+      currentEntry.currentTxnFirstOffset = Some(firstOffset)
       transactions += new TxnMetadata(producerId, firstOffset)
     }
   }
@@ -187,44 +232,27 @@ private[log] class ProducerAppendInfo(val producerId: Long,
                          offset: Long,
                          timestamp: Long): CompletedTxn = {
     if (isFenced(producerEpoch))
-      throw new ProducerFencedException(s"Invalid producer epoch: $producerEpoch (zombie): ${this.producerEpoch} (current)")
+      throw new ProducerFencedException(s"Invalid producer epoch: $producerEpoch (zombie): ${currentEntry.producerEpoch} (current)")
 
-    if (this.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
+    if (currentEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
       throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch: ${endTxnMarker.coordinatorEpoch} " +
-        s"(zombie), $coordinatorEpoch (current)")
+        s"(zombie), ${currentEntry.coordinatorEpoch} (current)")
 
-    if (producerEpoch != this.producerEpoch) {
-      // it is possible that this control record is the first record seen from a new epoch (the producer
-      // may fail before sending to the partition or the request itself could fail for some reason). In this
-      // case, we bump the epoch and reset the sequence numbers
-      this.producerEpoch = producerEpoch
-      this.firstSeq = RecordBatch.NO_SEQUENCE
-      this.lastSeq = RecordBatch.NO_SEQUENCE
-    } else {
-      // the control record is the last append to the log, so the last offset will be updated to point to it.
-      // However, the sequence numbers still point to the previous batch, so the duplicate check would no longer
-      // be correct: it would return the wrong offset. To fix this, we treat the control record as a batch
-      // of size 1 which uses the last appended sequence number.
-      this.firstSeq = this.lastSeq
-    }
+    currentEntry.maybeUpdateEpoch(producerEpoch)
 
-    val firstOffset = currentTxnFirstOffset match {
+    val firstOffset = currentEntry.currentTxnFirstOffset match {
       case Some(txnFirstOffset) => txnFirstOffset
       case None =>
         transactions += new TxnMetadata(producerId, offset)
         offset
     }
 
-    this.lastOffset = offset
-    this.currentTxnFirstOffset = None
-    this.maxTimestamp = timestamp
-    this.coordinatorEpoch = endTxnMarker.coordinatorEpoch
+    currentEntry.currentTxnFirstOffset = None
+    currentEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
     CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
   }
 
-  def lastEntry: ProducerIdEntry =
-    ProducerIdEntry(producerId, producerEpoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp,
-      coordinatorEpoch, currentTxnFirstOffset)
+  def latestEntry: ProducerIdEntry = currentEntry
 
   def startedTransactions: List[TxnMetadata] = transactions.toList
 
@@ -243,11 +271,11 @@ private[log] class ProducerAppendInfo(val producerId: Long,
   override def toString: String = {
     "ProducerAppendInfo(" +
       s"producerId=$producerId, " +
-      s"producerEpoch=$producerEpoch, " +
-      s"firstSequence=$firstSeq, " +
-      s"lastSequence=$lastSeq, " +
-      s"currentTxnFirstOffset=$currentTxnFirstOffset, " +
-      s"coordinatorEpoch=$coordinatorEpoch, " +
+      s"producerEpoch=${currentEntry.producerEpoch}, " +
+      s"firstSequence=${currentEntry.firstSeq}, " +
+      s"lastSequence=${currentEntry.lastSeq}, " +
+      s"currentTxnFirstOffset=${currentEntry.currentTxnFirstOffset}, " +
+      s"coordinatorEpoch=${currentEntry.coordinatorEpoch}, " +
       s"startedTransactions=$transactions)"
   }
 }
@@ -309,7 +337,7 @@ object ProducerStateManager {
         val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
         val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
         val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
-        val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp,
+        val newEntry = new ProducerIdEntry(producerId, mutable.Queue[BatchMetadata](BatchMetadata(seq, offset, offsetDelta, timestamp)), producerEpoch,
           coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
         newEntry
       }
@@ -329,9 +357,9 @@ object ProducerStateManager {
         producerEntryStruct.set(ProducerIdField, producerId)
           .set(ProducerEpochField, entry.producerEpoch)
           .set(LastSequenceField, entry.lastSeq)
-          .set(LastOffsetField, entry.lastOffset)
-          .set(OffsetDeltaField, entry.offsetDelta)
-          .set(TimestampField, entry.timestamp)
+          .set(LastOffsetField, entry.lastDataOffset)
+          .set(OffsetDeltaField, entry.lastOffsetDelta)
+          .set(TimestampField, entry.lastTimestamp)
           .set(CoordinatorEpochField, entry.coordinatorEpoch)
           .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))
         producerEntryStruct
@@ -472,7 +500,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   private def isProducerExpired(currentTimeMs: Long, producerIdEntry: ProducerIdEntry): Boolean =
-    producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.timestamp >= maxProducerIdExpirationMs
+    producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.lastTimestamp >= maxProducerIdExpirationMs
 
   /**
    * Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
@@ -508,7 +536,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo =
-    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.Empty), validateSequenceNumbers,
+    new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.empty(producerId)), validateSequenceNumbers,
       loadingFromLog)
 
   /**
@@ -520,7 +548,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
     trace(s"Updated producer ${appendInfo.producerId} state to $appendInfo")
 
-    val entry = appendInfo.lastEntry
+    val entry = appendInfo.latestEntry
     producers.put(appendInfo.producerId, entry)
     appendInfo.startedTransactions.foreach { txn =>
       ongoingTxns.put(txn.firstOffset.messageOffset, txn)
@@ -562,7 +590,8 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFilename(file.getName))
 
   private def isProducerRetained(producerIdEntry: ProducerIdEntry, logStartOffset: Long): Boolean = {
-    producerIdEntry.lastOffset >= logStartOffset
+    producerIdEntry.removeBatchesOlderThan(logStartOffset)
+    producerIdEntry.lastDataOffset >= logStartOffset
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 025617f..c4f7ce0 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -115,7 +115,7 @@ object DumpLogSegments {
         case Log.TimeIndexFileSuffix =>
           dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
         case Log.PidSnapshotFileSuffix =>
-          dumpPidSnapshot(file)
+          dumpProducerIdSnapshot(file)
         case Log.TxnIndexFileSuffix =>
           dumpTxnIndex(file)
         case _ =>
@@ -152,12 +152,12 @@ object DumpLogSegments {
     }
   }
 
-  private def dumpPidSnapshot(file: File): Unit = {
+  private def dumpProducerIdSnapshot(file: File): Unit = {
     try {
-      ProducerStateManager.readSnapshot(file).foreach { entry=>
-        println(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} lastSequence: ${entry.lastSeq} " +
-          s"lastOffset: ${entry.lastOffset} offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp} " +
-          s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset}")
+      ProducerStateManager.readSnapshot(file).foreach { entry =>
+        println(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} " +
+          s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset} " +
+          s"cachedMetadata: ${entry.batchMetadata}")
       }
     } catch {
       case e: CorruptSnapshotException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index a11972e..1bde7b1 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -122,7 +122,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
 
     val producerConfig = new Properties()
     producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
     val producerConfigWithCompression = new Properties()
     producerConfigWithCompression ++= producerConfig
     producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 810f481..1a85d34 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -24,12 +24,15 @@ import kafka.server.KafkaConfig
 import kafka.utils.{ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Test
 
 import scala.collection.JavaConverters._
 import org.junit.Assert._
 
+import scala.collection.mutable
+
 
 class TransactionsBounceTest extends KafkaServerTestHarness {
   private val producerBufferSize =  65536
@@ -76,12 +79,12 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
     // basic idea is to seed a topic with 10000 records, and copy it transactionally while bouncing brokers
     // constantly through the period.
     val consumerGroup = "myGroup"
-    val numInputRecords = 5000
+    val numInputRecords = 10000
     createTopics()
 
     TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers)
     val consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
-    val producer = TestUtils.createTransactionalProducer("test-txn", servers)
+    val producer = TestUtils.createTransactionalProducer("test-txn", servers, 512)
 
     producer.initTransactions()
 
@@ -125,9 +128,20 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
     scheduler.shutdown()
 
     val verifyingConsumer = createConsumerAndSubscribeToTopics("randomGroup", List(outputTopic), readCommitted = true)
-    val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { record =>
-      TestUtils.assertCommittedAndGetValue(record).toInt
+    val recordsByPartition = new mutable.HashMap[TopicPartition, mutable.ListBuffer[Int]]()
+    TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).foreach { record =>
+      val value = TestUtils.assertCommittedAndGetValue(record).toInt
+      val topicPartition = new TopicPartition(record.topic(), record.partition())
+      recordsByPartition.getOrElseUpdate(topicPartition, new mutable.ListBuffer[Int])
+        .append(value)
+    }
+
+    val outputRecords = new mutable.ListBuffer[Int]()
+    recordsByPartition.values.foreach { case (partitionValues) =>
+      assertEquals("Out of order messages detected", partitionValues, partitionValues.sorted)
+      outputRecords.appendAll(partitionValues)
     }
+
     val recordSet = outputRecords.toSet
     assertEquals(numInputRecords, recordSet.size)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 2a07532..61a8492 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -17,6 +17,7 @@
  package kafka.log
 
 import java.io.File
+import java.util.concurrent.ConcurrentLinkedDeque
 
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.checkEquals
@@ -317,7 +318,7 @@ class LogSegmentTest {
 
     // recover again, but this time assuming the transaction from pid2 began on a previous segment
     stateManager = new ProducerStateManager(topicPartition, logDir)
-    stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L)))
+    stateManager.loadProducerEntry(new ProducerIdEntry(pid2, mutable.Queue[BatchMetadata](BatchMetadata(10, 90L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L)))
     segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index ad64e39..2ae62c5 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -418,14 +418,11 @@ class LogTest {
 
     log.truncateTo(baseOffset + 4)
 
-    val activeProducers = log.activeProducers
+    val activeProducers = log.activeProducersWithLastSequence
     assertTrue(activeProducers.contains(pid))
 
-    val entry = activeProducers(pid)
-    assertEquals(0, entry.firstSeq)
-    assertEquals(baseOffset, entry.firstOffset)
-    assertEquals(3, entry.lastSeq)
-    assertEquals(baseOffset + 3, entry.lastOffset)
+    val lastSeq = activeProducers(pid)
+    assertEquals(3, lastSeq)
   }
 
   @Test
@@ -462,14 +459,11 @@ class LogTest {
 
     log.truncateTo(baseOffset + 2)
 
-    val activeProducers = log.activeProducers
+    val activeProducers = log.activeProducersWithLastSequence
     assertTrue(activeProducers.contains(pid))
 
-    val entry = activeProducers(pid)
-    assertEquals(0, entry.firstSeq)
-    assertEquals(baseOffset, entry.firstOffset)
-    assertEquals(1, entry.lastSeq)
-    assertEquals(baseOffset + 1, entry.lastOffset)
+    val lastSeq = activeProducers(pid)
+    assertEquals(1, lastSeq)
   }
 
   @Test
@@ -498,14 +492,11 @@ class LogTest {
     val filteredRecords = MemoryRecords.readableRecords(filtered)
 
     log.appendAsFollower(filteredRecords)
-    val activeProducers = log.activeProducers
+    val activeProducers = log.activeProducersWithLastSequence
     assertTrue(activeProducers.contains(pid))
 
-    val entry = activeProducers(pid)
-    assertEquals(0, entry.firstSeq)
-    assertEquals(baseOffset, entry.firstOffset)
-    assertEquals(3, entry.lastSeq)
-    assertEquals(baseOffset + 3, entry.lastOffset)
+    val lastSeq = activeProducers(pid)
+    assertEquals(3, lastSeq)
   }
 
   @Test
@@ -547,13 +538,13 @@ class LogTest {
     }
 
     log.truncateTo(1L)
-    assertEquals(1, log.activeProducers.size)
+    assertEquals(1, log.activeProducersWithLastSequence.size)
 
-    val pidEntryOpt = log.activeProducers.get(pid)
-    assertTrue(pidEntryOpt.isDefined)
+    val lastSeqOpt = log.activeProducersWithLastSequence.get(pid)
+    assertTrue(lastSeqOpt.isDefined)
 
-    val pidEntry = pidEntryOpt.get
-    assertEquals(0, pidEntry.lastSeq)
+    val lastSeq = lastSeqOpt.get
+    assertEquals(0, lastSeq)
   }
 
   @Test
@@ -568,21 +559,21 @@ class LogTest {
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
-    assertEquals(2, log.activeProducers.size)
+    assertEquals(2, log.activeProducersWithLastSequence.size)
 
     log.maybeIncrementLogStartOffset(1L)
 
-    assertEquals(1, log.activeProducers.size)
-    val retainedEntryOpt = log.activeProducers.get(pid2)
-    assertTrue(retainedEntryOpt.isDefined)
-    assertEquals(0, retainedEntryOpt.get.lastSeq)
+    assertEquals(1, log.activeProducersWithLastSequence.size)
+    val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertTrue(retainedLastSeqOpt.isDefined)
+    assertEquals(0, retainedLastSeqOpt.get)
 
     log.close()
 
     val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
-    assertEquals(1, reloadedLog.activeProducers.size)
-    val reloadedEntryOpt = log.activeProducers.get(pid2)
-    assertEquals(retainedEntryOpt, reloadedEntryOpt)
+    assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
+    val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
 
   @Test
@@ -600,24 +591,24 @@ class LogTest {
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
 
     assertEquals(2, log.logSegments.size)
-    assertEquals(2, log.activeProducers.size)
+    assertEquals(2, log.activeProducersWithLastSequence.size)
 
     log.maybeIncrementLogStartOffset(1L)
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals(1, log.logSegments.size)
-    assertEquals(1, log.activeProducers.size)
-    val retainedEntryOpt = log.activeProducers.get(pid2)
-    assertTrue(retainedEntryOpt.isDefined)
-    assertEquals(0, retainedEntryOpt.get.lastSeq)
+    assertEquals(1, log.activeProducersWithLastSequence.size)
+    val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertTrue(retainedLastSeqOpt.isDefined)
+    assertEquals(0, retainedLastSeqOpt.get)
 
     log.close()
 
     val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
-    assertEquals(1, reloadedLog.activeProducers.size)
-    val reloadedEntryOpt = log.activeProducers.get(pid2)
-    assertEquals(retainedEntryOpt, reloadedEntryOpt)
+    assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
+    val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2)
+    assertEquals(retainedLastSeqOpt, reloadedEntryOpt)
   }
 
   @Test
@@ -659,13 +650,13 @@ class LogTest {
     log.takeProducerSnapshot()
 
     assertEquals(3, log.logSegments.size)
-    assertEquals(Set(pid1, pid2), log.activeProducers.keySet)
+    assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet)
 
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals(2, log.logSegments.size)
-    assertEquals(Set(pid2), log.activeProducers.keySet)
+    assertEquals(Set(pid2), log.activeProducersWithLastSequence.keySet)
   }
 
   @Test
@@ -749,13 +740,13 @@ class LogTest {
     val records = Seq(new SimpleRecord(mockTime.milliseconds(), "foo".getBytes))
     log.appendAsLeader(TestUtils.records(records, producerId = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0)
 
-    assertEquals(Set(pid), log.activeProducers.keySet)
+    assertEquals(Set(pid), log.activeProducersWithLastSequence.keySet)
 
     mockTime.sleep(producerIdExpirationCheckIntervalMs)
-    assertEquals(Set(pid), log.activeProducers.keySet)
+    assertEquals(Set(pid), log.activeProducersWithLastSequence.keySet)
 
     mockTime.sleep(producerIdExpirationCheckIntervalMs)
-    assertEquals(Set(), log.activeProducers.keySet)
+    assertEquals(Set(), log.activeProducersWithLastSequence.keySet)
   }
 
   @Test
@@ -805,16 +796,22 @@ class LogTest {
       case _: OutOfOrderSequenceException => // Good!
     }
 
-    // Append a Duplicate of an entry in the middle of the log. This is not allowed.
+    // Append a duplicate of the batch which is 4th from the tail. This should succeed without error since we
+    // retain the batch metadata of the last 5 batches.
+    val duplicateOfFourth = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
+      producerId = pid, producerEpoch = epoch, sequence = 2)
+    log.appendAsLeader(duplicateOfFourth, leaderEpoch = 0)
+
+    // Append a Duplicate of an entry older than the last 5 appended batches. This should result in a DuplicateSequenceNumberException.
      try {
       val records = TestUtils.records(
         List(new SimpleRecord(mockTime.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
         producerId = pid, producerEpoch = epoch, sequence = 1)
       log.appendAsLeader(records, leaderEpoch = 0)
-      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
-        "in the middle of the log.")
+      fail ("Should have received an DuplicateSequenceNumberException since we attempted to append a duplicate of a batch" +
+        "which is older than the last 5 appended batches.")
     } catch {
-      case _: OutOfOrderSequenceException => // Good!
+      case _: DuplicateSequenceException => // Good!
     }
 
     // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry.
@@ -872,7 +869,7 @@ class LogTest {
     }
   }
 
-  @Test(expected = classOf[DuplicateSequenceNumberException])
+  @Test(expected = classOf[DuplicateSequenceException])
   def testDuplicateAppendToFollower() : Unit = {
     val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -888,7 +885,7 @@ class LogTest {
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
   }
 
-  @Test(expected = classOf[DuplicateSequenceNumberException])
+  @Test(expected = classOf[DuplicateSequenceException])
   def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
     val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)