You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/03 02:41:47 UTC

[1/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics

Repository: kafka
Updated Branches:
  refs/heads/trunk 1ce6aa550 -> bdf4cba04


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 768c073..a7af24e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -28,8 +28,7 @@ import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.KafkaConfig
-import org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP
-import org.apache.kafka.common.record.{RecordBatch, _}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -69,7 +68,7 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singletonRecords("test".getBytes)
 
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long)
+    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
 
     // create a log
@@ -77,6 +76,7 @@ class LogTest extends JUnitSuite {
                       LogConfig(logProps),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
+                      maxPidExpirationMs = 24 * 60,
                       scheduler = time.scheduler,
                       time = time)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
@@ -120,6 +120,219 @@ class LogTest extends JUnitSuite {
     assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testNonSequentialAppend(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val epoch: Short = 0
+
+    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 0)
+    log.append(records, assignOffsets = true)
+
+    val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 2)
+    log.append(nextRecords, assignOffsets = true)
+  }
+
+  @Test
+  def testDuplicateAppends(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val epoch: Short = 0
+
+    var seq = 0
+    // Pad the beginning of the log.
+    for (i <- 0 to 5) {
+      val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
+        pid = pid, epoch = epoch, sequence = seq)
+      log.append(record, assignOffsets = true)
+      seq = seq + 1
+    }
+    // Append an entry with multiple log records.
+    var record = TestUtils.records(List(
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)
+    ), pid = pid, epoch = epoch, sequence = seq)
+    val multiEntryAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
+    seq = seq + 3
+
+    // Append a Duplicate of the tail, when the entry at the tail has multiple records.
+    val dupMultiEntryAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
+      multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset)
+    assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
+      multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset)
+
+    // Append a partial duplicate of the tail. This is not allowed.
+    try {
+      record = TestUtils.records(
+        List(
+          new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+          new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)),
+        pid = pid, epoch = epoch, sequence = seq - 2)
+      log.append(record, assignOffsets = true)
+      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " +
+        "in the middle of the log.")
+    } catch {
+      case e: OutOfOrderSequenceException => // Good!
+    }
+
+    // Append a Duplicate of an entry in the middle of the log. This is not allowed.
+     try {
+      record = TestUtils.records(
+        List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
+        pid = pid, epoch = epoch, sequence = 1)
+      log.append(record, assignOffsets = true)
+      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " +
+        "in the middle of the log.")
+    } catch {
+      case e: OutOfOrderSequenceException => // Good!
+    }
+
+    // Append a duplicate entry with a single record at the tail of the log. This should return the appendInfo of the original entry.
+    record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
+      pid = pid, epoch = epoch, sequence = seq)
+    val origAppendInfo = log.append(record, assignOffsets = true)
+    val newAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("Inserted a duplicate record into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset)
+    assertEquals("Inserted a duplicate record into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset)
+  }
+
+  @Test
+  def testMulitplePidsPerMemoryRecord() : Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val epoch: Short = 0
+
+    val buffer = ByteBuffer.allocate(512)
+
+    var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 3L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 4L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    buffer.flip()
+    val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+    log.append(memoryRecords, assignOffsets = false)
+    log.flush()
+
+    val fetchedData = log.read(0, Int.MaxValue)
+
+    val origIterator = memoryRecords.batches.iterator()
+    for (batch <- fetchedData.records.batches.asScala) {
+      assertTrue(origIterator.hasNext)
+      val origEntry = origIterator.next()
+      assertEquals(origEntry.producerId, batch.producerId)
+      assertEquals(origEntry.baseOffset, batch.baseOffset)
+      assertEquals(origEntry.baseSequence, batch.baseSequence)
+    }
+  }
+
+  @Test(expected = classOf[DuplicateSequenceNumberException])
+  def testMultiplePidsWithDuplicates() : Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val epoch: Short = 0
+
+    val buffer = ByteBuffer.allocate(512)
+
+    var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 1L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 2L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 4L, time.milliseconds(), 1L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    buffer.flip()
+
+    log.append(MemoryRecords.readableRecords(buffer), assignOffsets = false)
+    // Should throw a duplicate seqeuence exception here.
+    fail("should have thrown a DuplicateSequenceNumberException.")
+  }
+
+  @Test(expected = classOf[ProducerFencedException])
+  def testOldProducerEpoch(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val newEpoch: Short = 1
+    val oldEpoch: Short = 0
+
+    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = newEpoch, sequence = 0)
+    log.append(records, assignOffsets = true)
+
+    val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = oldEpoch, sequence = 0)
+    log.append(nextRecords, assignOffsets = true)
+  }
+
   /**
    * Test for jitter s for time based log roll. This test appends messages then changes the time
    * using the mock clock to force the log to roll and checks the number of segments.
@@ -167,7 +380,7 @@ class LogTest extends JUnitSuite {
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -183,7 +396,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds))
   }
 
@@ -196,7 +409,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
@@ -220,7 +433,7 @@ class LogTest extends JUnitSuite {
   def testAppendAndReadWithNonSequentialOffsets() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -245,7 +458,7 @@ class LogTest extends JUnitSuite {
   def testReadAtLogGap() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -262,7 +475,7 @@ class LogTest extends JUnitSuite {
   def testReadWithMinMessage() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -290,7 +503,7 @@ class LogTest extends JUnitSuite {
   def testReadWithTooSmallMaxLength() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -326,7 +539,7 @@ class LogTest extends JUnitSuite {
 
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "42".getBytes))
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
@@ -357,7 +570,7 @@ class LogTest extends JUnitSuite {
     /* create a multipart log with 100 messages */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
                                                                                 timestamp = time.milliseconds))
@@ -395,7 +608,7 @@ class LogTest extends JUnitSuite {
     /* this log should roll after every messageset */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
@@ -421,7 +634,7 @@ class LogTest extends JUnitSuite {
       val logProps = new Properties()
       logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
       logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
-      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10))
 
@@ -457,7 +670,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     try {
       log.append(messageSet)
@@ -484,7 +697,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     try {
       log.append(messageSetWithUnkeyedMessage)
@@ -526,7 +739,7 @@ class LogTest extends JUnitSuite {
     val maxMessageSize = second.sizeInBytes - 1
     val logProps = new Properties()
     logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -552,7 +765,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
         timestamp = time.milliseconds + i * 10))
@@ -578,12 +791,12 @@ class LogTest extends JUnitSuite {
       assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
     }
 
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler, time = time)
     verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     verifyRecoveredLog(log)
     log.close()
   }
@@ -599,7 +812,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     val messages = (0 until numMessages).map { i =>
       MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
@@ -623,7 +836,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -635,7 +848,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // reopen the log
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@@ -662,7 +875,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
@@ -672,7 +885,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // The rebuilt time index should be empty
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, time = time)
     val segArray = log.logSegments.toArray
     for (i <- 0 until segArray.size - 1) {
       assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
@@ -693,7 +906,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -715,7 +928,7 @@ class LogTest extends JUnitSuite {
     }
 
     // reopen the log
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler, time = time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -842,8 +1055,8 @@ class LogTest extends JUnitSuite {
                       LogConfig(logProps),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
     assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0)
@@ -874,8 +1087,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for (_ <- 0 until 100)
@@ -885,8 +1098,8 @@ class LogTest extends JUnitSuite {
                   config,
                   logStartOffset = 0L,
                   recoveryPoint = 0L,
-                  time.scheduler,
-                  time)
+                  scheduler = time.scheduler,
+                  time = time)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
@@ -911,8 +1124,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -952,8 +1165,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -967,8 +1180,8 @@ class LogTest extends JUnitSuite {
                   config,
                   logStartOffset = 0L,
                   recoveryPoint = 0L,
-                  time.scheduler,
-                  time)
+                  scheduler = time.scheduler,
+                  time = time)
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
   }
 
@@ -978,8 +1191,8 @@ class LogTest extends JUnitSuite {
                       LogConfig(),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
     log.append(TestUtils.singletonRecords(value = null))
     val head = log.read(0, 4096, None).records.records.iterator.next()
     assertEquals(0, head.offset)
@@ -992,8 +1205,8 @@ class LogTest extends JUnitSuite {
       LogConfig(),
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => log.append(MemoryRecords.withRecords(CompressionType.NONE, record)))
     val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
@@ -1006,8 +1219,8 @@ class LogTest extends JUnitSuite {
       LogConfig(),
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     log.append(MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)))
   }
@@ -1029,8 +1242,8 @@ class LogTest extends JUnitSuite {
                         config,
                         logStartOffset = 0L,
                         recoveryPoint = 0L,
-                        time.scheduler,
-                        time)
+                        scheduler = time.scheduler,
+                        time = time)
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
         log.append(set)
@@ -1072,8 +1285,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
     val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
     val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
@@ -1121,8 +1334,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     for (_ <- 0 until 100)
       log.append(set)
     log.close()
@@ -1221,8 +1434,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -1366,8 +1579,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     log
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
new file mode 100644
index 0000000..d27f0ca
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
@@ -0,0 +1,224 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.log
+
+import java.io.File
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException, ProducerFencedException}
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+class ProducerIdMappingTest extends JUnitSuite {
+  var idMappingDir: File = null
+  var config: LogConfig = null
+  var idMapping: ProducerIdMapping = null
+  val partition = new TopicPartition("test", 0)
+  val pid = 1L
+  val maxPidExpirationMs = 60 * 1000
+  val time = new MockTime
+
+  @Before
+  def setUp(): Unit = {
+    // Create configuration including number of snapshots to hold
+    val props = new Properties()
+    config = LogConfig(props)
+
+    // Create temporary directory
+    idMappingDir = TestUtils.tempDir()
+
+    // Instantiate IdMapping
+    idMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs)
+  }
+
+  @After
+  def tearDown(): Unit = {
+    idMappingDir.listFiles().foreach(f => f.delete())
+    idMappingDir.deleteOnExit()
+  }
+
+  @Test
+  def testBasicIdMapping(): Unit = {
+    val epoch = 0.toShort
+
+    // First entry for id 0 added
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+
+    // Second entry for id 0 added
+    checkAndUpdate(idMapping, pid, 1, epoch, 0L, 1L)
+
+    // Duplicate sequence number (matches previous sequence number)
+    assertThrows[DuplicateSequenceNumberException] {
+      checkAndUpdate(idMapping, pid, 1, epoch, 0L, 1L)
+    }
+
+    // Invalid sequence number (greater than next expected sequence number)
+    assertThrows[OutOfOrderSequenceException] {
+      checkAndUpdate(idMapping, pid, 5, epoch, 0L, 2L)
+    }
+
+    // Change epoch
+    checkAndUpdate(idMapping, pid, 0, (epoch + 1).toShort, 0L, 3L)
+
+    // Incorrect epoch
+    assertThrows[ProducerFencedException] {
+      checkAndUpdate(idMapping, pid, 0, epoch, 0L, 4L)
+    }
+  }
+
+  @Test
+  def testTakeSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 1L)
+
+    // Take snapshot
+    idMapping.maybeTakeSnapshot()
+
+    // Check that file exists and it is not empty
+    assertEquals("Directory doesn't contain a single file as expected", 1, idMappingDir.list().length)
+    assertTrue("Snapshot file is empty", idMappingDir.list().head.length > 0)
+  }
+
+  @Test
+  def testRecoverFromSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, time.milliseconds)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, time.milliseconds)
+    idMapping.maybeTakeSnapshot()
+    val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(3L, time.milliseconds)
+
+    // entry added after recovery
+    checkAndUpdate(recoveredMapping, pid, 2, epoch, 2L, time.milliseconds)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testRemoveExpiredPidsOnReload(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 1)
+
+    idMapping.maybeTakeSnapshot()
+    val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(1L, 70000)
+
+    // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence
+    // we should get an out of order sequence exception.
+    checkAndUpdate(recoveredMapping, pid, 2, epoch, 2L, 70001)
+  }
+
+
+  @Test
+  def testRemoveOldSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 1L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 2L)
+
+    idMapping.maybeTakeSnapshot()
+
+    checkAndUpdate(idMapping, pid, 2, epoch, 2L, 3L)
+
+    idMapping.maybeTakeSnapshot()
+
+    assertEquals(s"number of snapshot files is incorrect: ${idMappingDir.listFiles().length}",
+               1, idMappingDir.listFiles().length)
+  }
+
+  @Test
+  def testSkipSnapshotIfOffsetUnchanged(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+
+    idMapping.maybeTakeSnapshot()
+
+    // nothing changed so there should be no new snapshot
+    idMapping.maybeTakeSnapshot()
+
+    assertEquals(s"number of snapshot files is incorrect: ${idMappingDir.listFiles().length}",
+      1, idMappingDir.listFiles().length)
+  }
+
+  @Test
+  def testStartOffset(): Unit = {
+    val epoch = 0.toShort
+    val pid2 = 2L
+    checkAndUpdate(idMapping, pid2, 0, epoch, 0L, 1L)
+    checkAndUpdate(idMapping, pid, 0, epoch, 1L, 2L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 2L, 3L)
+    checkAndUpdate(idMapping, pid, 2, epoch, 3L, 4L)
+    idMapping.maybeTakeSnapshot()
+
+    intercept[OutOfOrderSequenceException] {
+      val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs)
+      recoveredMapping.truncateAndReload(1L, time.milliseconds)
+      checkAndUpdate(recoveredMapping, pid2, 1, epoch, 4L, 5L)
+    }
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testPidExpirationTimeout() {
+    val epoch = 5.toShort
+    val sequence = 37
+    checkAndUpdate(idMapping, pid, sequence, epoch, 1L)
+    time.sleep(maxPidExpirationMs + 1)
+    idMapping.checkForExpiredPids(time.milliseconds)
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 1L)
+  }
+
+  @Test
+  def testLoadPid() {
+    val epoch = 5.toShort
+    val sequence = 37
+    val createTimeMs = time.milliseconds
+    idMapping.load(pid, ProducerIdEntry(epoch, sequence, 0L, 1, createTimeMs), time.milliseconds)
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 2L)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testLoadIgnoresExpiredPids() {
+    val epoch = 5.toShort
+    val sequence = 37
+
+    val createTimeMs = time.milliseconds
+    time.sleep(maxPidExpirationMs + 1)
+    val loadTimeMs = time.milliseconds
+    idMapping.load(pid, ProducerIdEntry(epoch, sequence, 0L, 1, createTimeMs), loadTimeMs)
+
+    // entry wasn't loaded, so this should fail
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 2L)
+  }
+
+  private def checkAndUpdate(mapping: ProducerIdMapping,
+                             pid: Long,
+                             seq: Int,
+                             epoch: Short,
+                             lastOffset: Long,
+                             timestamp: Long = time.milliseconds()): Unit = {
+    val numRecords = 1
+    val incomingPidEntry = ProducerIdEntry(epoch, seq, lastOffset, numRecords, timestamp)
+    val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid).getOrElse(ProducerIdEntry.Empty))
+    producerAppendInfo.append(incomingPidEntry)
+    mapping.update(producerAppendInfo)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index b6e3607..f868032 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -201,6 +201,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val start = System.currentTimeMillis()
 
     //Start the new broker (and hence start replicating)
+    debug("Starting new broker")
     brokers = brokers :+ createServer(fromProps(createBrokerConfig(101, zkConnect)))
     waitForOffsetsToMatch(msgCount, 0, 101)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9ae7195..8766855 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -937,9 +937,10 @@ object TestUtils extends Logging {
                    flushRecoveryOffsetCheckpointMs = 10000L,
                    flushStartOffsetCheckpointMs = 10000L,
                    retentionCheckMs = 1000L,
+                   maxPidExpirationMs = 60 * 60 * 1000,
                    scheduler = time.scheduler,
                    time = time,
-                   brokerState = new BrokerState())
+                   brokerState = BrokerState())
   }
 
   @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
index 918c4b5..db62020 100755
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -40,6 +40,21 @@ class ZkUtilsTest extends ZooKeeperTestHarness {
     assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, 0))
   }
 
+  // Verify behaviour of ZkUtils.createSequentialPersistentPath since PIDManager relies on it
+  @Test
+  def testPersistentSequentialPath() {
+    // Given an existing path
+    zkUtils.createPersistentPath(path)
+
+    var result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
+
+    assertEquals("/path/sequence_0000000000", result)
+
+    result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
+
+    assertEquals("/path/sequence_0000000001", result)
+  }
+
   @Test
   def testAbortedConditionalDeletePath() {
     // Given an existing path that gets updated

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 859e3c4..3cf3abd 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -15,25 +15,22 @@
 
 import json
 import os
-import signal
 import time
 
 from ducktape.services.background_thread import BackgroundThreadService
 from ducktape.cluster.remoteaccount import RemoteCommandError
-from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.utils import is_int, is_int_with_prefix
 from kafkatest.version import DEV_BRANCH
-from kafkatest.utils.remote_account import line_count
 
 class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
     """This service wraps org.apache.kafka.tools.VerifiableProducer for use in
-    system testing. 
+    system testing.
 
     NOTE: this class should be treated as a PUBLIC API. Downstream users use
-    this service both directly and through class extension, so care must be 
+    this service both directly and through class extension, so care must be
     taken to ensure compatibility.
     """
 
@@ -59,7 +56,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
 
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
                  message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
-                 stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO"):
+                 stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages produced. There are
@@ -92,6 +89,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         self.acks = acks
         self.stop_timeout_sec = stop_timeout_sec
         self.request_timeout_sec = request_timeout_sec
+        self.enable_idempotence = enable_idempotence
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -124,6 +122,12 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
             producer_prop_file += "\nacks=%s\n" % self.acks
 
         producer_prop_file += "\nrequest.timeout.ms=%d\n" % (self.request_timeout_sec * 1000)
+        if self.enable_idempotence:
+            self.logger.info("Setting up an idempotent producer")
+            producer_prop_file += "\nmax.in.flight.requests.per.connection=1\n"
+            producer_prop_file += "\nretries=50\n"
+            producer_prop_file += "\nenable.idempotence=true\n"
+
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index 3e17d56..5d96d7b 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -104,11 +104,12 @@ class ReplicationTest(ProduceConsumeValidateTest):
 
         self.topic = "test_topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}
-                                                                })
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk,
+                                  topics={self.topic: {
+                                      "partitions": 3,
+                                      "replication-factor": 3,
+                                      'configs': {"min.insync.replicas": 2}}
+                                  })
         self.producer_throughput = 1000
         self.num_producers = 1
         self.num_consumers = 1
@@ -123,6 +124,10 @@ class ReplicationTest(ProduceConsumeValidateTest):
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["leader"],
+            security_protocol=["PLAINTEXT"],
+            enable_idempotence=[True])
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            broker_type=["leader"],
             security_protocol=["PLAINTEXT", "SASL_SSL"])
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["controller"],
@@ -133,7 +138,10 @@ class ReplicationTest(ProduceConsumeValidateTest):
     @parametrize(failure_mode="hard_bounce",
             broker_type="leader",
             security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512")
-    def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"):
+    def test_replication_with_broker_failure(self, failure_mode, security_protocol,
+                                             broker_type, client_sasl_mechanism="GSSAPI",
+                                             interbroker_sasl_mechanism="GSSAPI",
+                                             enable_idempotence=False):
         """Replication tests.
         These tests verify that replication provides simple durability guarantees by checking that data acked by
         brokers is still available for consumption in the face of various failure scenarios.
@@ -152,8 +160,8 @@ class ReplicationTest(ProduceConsumeValidateTest):
         self.kafka.client_sasl_mechanism = client_sasl_mechanism
         self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.enable_idempotence = enable_idempotence
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput, enable_idempotence=enable_idempotence)
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
         self.kafka.start()
-        
         self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index cad9150..079305c 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -42,6 +42,7 @@ class ProduceConsumeValidateTest(Test):
         # producer begins producing messages, in which case we will miss the
         # initial set of messages and get spurious test failures.
         self.consumer_init_timeout_sec = 0
+        self.enable_idempotence = False
 
     def setup_producer_and_consumer(self):
         raise NotImplementedError("Subclasses should implement this")
@@ -67,7 +68,7 @@ class ProduceConsumeValidateTest(Test):
             remaining_time = self.consumer_init_timeout_sec - (end - start)
             if remaining_time < 0 :
                 remaining_time = 0
-            if self.consumer.new_consumer is True:
+            if self.consumer.new_consumer:
                 wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True,
                            timeout_sec=remaining_time,
                            err_msg="Consumer process took more than %d s to have partitions assigned" %\
@@ -167,9 +168,17 @@ class ProduceConsumeValidateTest(Test):
             msg = self.annotate_data_lost(data_lost, msg, len(to_validate))
 
 
+        if self.enable_idempotence:
+            self.logger.info("Ran a test with idempotence enabled. We expect no duplicates")
+        else:
+            self.logger.info("Ran a test with idempotence disabled.")
+
         # Are there duplicates?
         if len(set(consumed)) != len(consumed):
-            msg += "(There are also %s duplicate messages in the log - but that is an acceptable outcome)\n" % abs(len(set(consumed)) - len(consumed))
+            num_duplicates = abs(len(set(consumed)) - len(consumed))
+            msg += "(There are also %s duplicate messages in the log - but that is an acceptable outcome)\n" % num_duplicates
+            if self.enable_idempotence:
+                assert False, "Detected %s duplicates even though idempotence was enabled." % num_duplicates
 
         # Collect all logs if validation fails
         if not success:


[2/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/ProducerIdMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
new file mode 100644
index 0000000..a870b7d
--- /dev/null
+++ b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.file.Files
+
+import kafka.common.KafkaException
+import kafka.utils.{Logging, nonthreadsafe}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException, ProducerFencedException}
+import org.apache.kafka.common.protocol.types._
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
+
+import scala.collection.{immutable, mutable}
+
+private[log] object ProducerIdEntry {
+  val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+    -1, 0, RecordBatch.NO_TIMESTAMP)
+}
+
+private[log] case class ProducerIdEntry(epoch: Short, lastSeq: Int, lastOffset: Long, numRecords: Int, timestamp: Long) {
+  def firstSeq: Int = lastSeq - numRecords + 1
+  def firstOffset: Long = lastOffset - numRecords + 1
+
+  def isDuplicate(batch: RecordBatch): Boolean = {
+    batch.producerEpoch == epoch &&
+      batch.baseSequence == firstSeq &&
+      batch.lastSequence == lastSeq
+  }
+}
+
+private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEntry) {
+  // the initialEntry here is the last successfull appended batch. we validate incoming entries transitively, starting
+  // with the last appended entry.
+  private var epoch = initialEntry.epoch
+  private var firstSeq = initialEntry.firstSeq
+  private var lastSeq = initialEntry.lastSeq
+  private var lastOffset = initialEntry.lastOffset
+  private var maxTimestamp = initialEntry.timestamp
+
+  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int) = {
+    if (this.epoch > epoch) {
+      throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer with a newer epoch. $epoch (request epoch), ${this.epoch} (server epoch)")
+    } else if (this.epoch == RecordBatch.NO_PRODUCER_EPOCH || this.epoch < epoch) {
+      if (firstSeq != 0)
+        throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " +
+          s"(request epoch), $firstSeq (seq. number)")
+    } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
+      throw new DuplicateSequenceNumberException(s"Duplicate sequence number: $pid (pid), $firstSeq " +
+        s"(seq. number), ${this.firstSeq} (expected seq. number)")
+    } else if (firstSeq != this.lastSeq + 1L) {
+      throw new OutOfOrderSequenceException(s"Invalid sequence number: $pid (pid), $firstSeq " +
+        s"(seq. number), ${this.lastSeq} (expected seq. number)")
+    }
+  }
+
+  def assignLastOffsetAndTimestamp(lastOffset: Long, lastTimestamp: Long): Unit = {
+    this.lastOffset = lastOffset
+    this.maxTimestamp = lastTimestamp
+  }
+
+  private def append(epoch: Short, firstSeq: Int, lastSeq: Int, lastTimestamp: Long, lastOffset: Long) {
+    validateAppend(epoch, firstSeq, lastSeq)
+    this.epoch = epoch
+    this.firstSeq = firstSeq
+    this.lastSeq = lastSeq
+    this.maxTimestamp = lastTimestamp
+    this.lastOffset = lastOffset
+  }
+
+  def append(batch: RecordBatch): Unit =
+    append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset)
+
+  def append(entry: ProducerIdEntry): Unit =
+    append(entry.epoch, entry.firstSeq, entry.lastSeq, entry.timestamp, entry.lastOffset)
+
+  def lastEntry: ProducerIdEntry =
+    ProducerIdEntry(epoch, lastSeq, lastOffset, lastSeq - firstSeq + 1, maxTimestamp)
+}
+
+private[log] class CorruptSnapshotException(msg: String) extends KafkaException(msg)
+
+object ProducerIdMapping {
+  private val DirnamePrefix = "pid-mapping"
+  private val FilenameSuffix = "snapshot"
+  private val FilenamePattern = s"^\\d{1,}.$FilenameSuffix".r
+  private val PidSnapshotVersion: Short = 1
+
+  private val VersionField = "version"
+  private val CrcField = "crc"
+  private val PidField = "pid"
+  private val LastSequenceField = "last_sequence"
+  private val EpochField = "epoch"
+  private val LastOffsetField = "last_offset"
+  private val NumRecordsField = "num_records"
+  private val TimestampField = "timestamp"
+  private val PidEntriesField = "pid_entries"
+
+  private val VersionOffset = 0
+  private val CrcOffset = VersionOffset + 2
+  private val PidEntriesOffset = CrcOffset + 4
+
+  private val maxPidSnapshotsToRetain = 2
+
+  val PidSnapshotEntrySchema = new Schema(
+    new Field(PidField, Type.INT64, "The producer ID"),
+    new Field(EpochField, Type.INT16, "Current epoch of the producer"),
+    new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"),
+    new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"),
+    new Field(NumRecordsField, Type.INT32, "The number of records written in the last log entry"),
+    new Field(TimestampField, Type.INT64, "Max timestamp from the last written entry"))
+  val PidSnapshotMapSchema = new Schema(
+    new Field(VersionField, Type.INT16, "Version of the snapshot file"),
+    new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
+    new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table"))
+
+  private def loadSnapshot(file: File, pidMap: mutable.Map[Long, ProducerIdEntry],
+                           checkNotExpired: (ProducerIdEntry) => Boolean) {
+    val buffer = Files.readAllBytes(file.toPath)
+    val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
+
+    val version = struct.getShort(VersionField)
+    if (version != PidSnapshotVersion)
+      throw new IllegalArgumentException(s"Unhandled snapshot file version $version")
+
+    val crc = struct.getUnsignedInt(CrcField)
+    val computedCrc =  Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
+    if (crc != computedCrc)
+      throw new CorruptSnapshotException(s"Snapshot file is corrupted (CRC is no longer valid). Stored crc: ${crc}. Computed crc: ${computedCrc}")
+
+    struct.getArray(PidEntriesField).foreach { pidEntryObj =>
+      val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
+      val pid = pidEntryStruct.getLong(PidField)
+      val epoch = pidEntryStruct.getShort(EpochField)
+      val seq = pidEntryStruct.getInt(LastSequenceField)
+      val offset = pidEntryStruct.getLong(LastOffsetField)
+      val timestamp = pidEntryStruct.getLong(TimestampField)
+      val numRecords = pidEntryStruct.getInt(NumRecordsField)
+      val newEntry = ProducerIdEntry(epoch, seq, offset, numRecords, timestamp)
+      if (checkNotExpired(newEntry))
+        pidMap.put(pid, newEntry)
+    }
+  }
+
+  private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) {
+    val struct = new Struct(PidSnapshotMapSchema)
+    struct.set(VersionField, PidSnapshotVersion)
+    struct.set(CrcField, 0L) // we'll fill this after writing the entries
+    val entriesArray = entries.map {
+      case (pid, entry) =>
+        val pidEntryStruct = struct.instance(PidEntriesField)
+        pidEntryStruct.set(PidField, pid)
+          .set(EpochField, entry.epoch)
+          .set(LastSequenceField, entry.lastSeq)
+          .set(LastOffsetField, entry.lastOffset)
+          .set(NumRecordsField, entry.numRecords)
+          .set(TimestampField, entry.timestamp)
+        pidEntryStruct
+    }.toArray
+    struct.set(PidEntriesField, entriesArray)
+
+    val buffer = ByteBuffer.allocate(struct.sizeOf)
+    struct.writeTo(buffer)
+    buffer.flip()
+
+    // now fill in the CRC
+    val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset)
+    ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
+
+    val fos = new FileOutputStream(file)
+    try {
+      fos.write(buffer.array, buffer.arrayOffset, buffer.limit)
+    } finally {
+      fos.close()
+    }
+  }
+
+  private def verifyFileName(name: String): Boolean = FilenamePattern.findFirstIn(name).isDefined
+
+  private def offsetFromFile(file: File): Long = {
+    s"${file.getName.replace(s".$FilenameSuffix", "")}".toLong
+  }
+
+  private def formatFileName(lastOffset: Long): String = {
+    // The files will be named '$lastOffset.snapshot' and located in 'logDir/pid-mapping'
+    s"$lastOffset.$FilenameSuffix"
+  }
+
+}
+
+/**
+ * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ *
+ * The sequence number is the last number successfully appended to the partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message
+ * appended to the partition.
+ */
+@nonthreadsafe
+class ProducerIdMapping(val config: LogConfig,
+                        val topicPartition: TopicPartition,
+                        val snapParentDir: File,
+                        val maxPidExpirationMs: Int) extends Logging {
+  import ProducerIdMapping._
+
+  val snapDir: File = new File(snapParentDir, DirnamePrefix)
+  snapDir.mkdir()
+
+  private val pidMap = mutable.Map[Long, ProducerIdEntry]()
+  private var lastMapOffset = 0L
+  private var lastSnapOffset = 0L
+
+  /**
+   * Returns the last offset of this map
+   */
+  def mapEndOffset = lastMapOffset
+
+  /**
+   * Get a copy of the active producers
+   */
+  def activePids: immutable.Map[Long, ProducerIdEntry] = pidMap.toMap
+
+  /**
+   * Load a snapshot of the id mapping or return empty maps
+   * in the case the snapshot doesn't exist (first time).
+   */
+  private def loadFromSnapshot(logEndOffset: Long, checkNotExpired:(ProducerIdEntry) => Boolean) {
+    pidMap.clear()
+
+    var loaded = false
+    while (!loaded) {
+      lastSnapshotFile(logEndOffset) match {
+        case Some(file) =>
+          try {
+            loadSnapshot(file, pidMap, checkNotExpired)
+            lastSnapOffset = offsetFromFile(file)
+            lastMapOffset = lastSnapOffset
+            loaded = true
+          } catch {
+            case e: CorruptSnapshotException =>
+              error(s"Snapshot file at ${file} is corrupt: ${e.getMessage}")
+              file.delete()
+          }
+        case None =>
+          lastSnapOffset = 0L
+          lastMapOffset = 0L
+          snapDir.mkdir()
+          loaded = true
+      }
+    }
+  }
+
+  def isEntryValid(currentTimeMs: Long, producerIdEntry: ProducerIdEntry) : Boolean = {
+    currentTimeMs - producerIdEntry.timestamp < maxPidExpirationMs
+  }
+
+  def checkForExpiredPids(currentTimeMs: Long) {
+    pidMap.retain { case (pid, lastEntry) =>
+      isEntryValid(currentTimeMs, lastEntry)
+    }
+  }
+
+  def truncateAndReload(logEndOffset: Long, currentTime: Long) {
+    truncateSnapshotFiles(logEndOffset)
+    def checkNotExpired = (producerIdEntry: ProducerIdEntry) => { isEntryValid(currentTime, producerIdEntry) }
+    loadFromSnapshot(logEndOffset, checkNotExpired)
+  }
+
+  /**
+   * Update the mapping with the given append information
+   */
+  def update(appendInfo: ProducerAppendInfo): Unit = {
+    if (appendInfo.pid == RecordBatch.NO_PRODUCER_ID)
+      throw new IllegalArgumentException("Invalid PID passed to update")
+    val entry = appendInfo.lastEntry
+    pidMap.put(appendInfo.pid, entry)
+    lastMapOffset = entry.lastOffset + 1
+  }
+
+  /**
+   * Load a previously stored PID entry into the cache. Ignore the entry if the timestamp is older
+   * than the current time minus the PID expiration time (i.e. if the PID has expired).
+   */
+  def load(pid: Long, entry: ProducerIdEntry, currentTimeMs: Long) {
+    if (pid != RecordBatch.NO_PRODUCER_ID && currentTimeMs - entry.timestamp < maxPidExpirationMs) {
+      pidMap.put(pid, entry)
+      lastMapOffset = entry.lastOffset + 1
+    }
+  }
+
+  /**
+   * Get the last written entry for the given PID.
+   */
+  def lastEntry(pid: Long): Option[ProducerIdEntry] = pidMap.get(pid)
+
+  /**
+    * Serialize and write the bytes to a file. The file name is a concatenation of:
+    *   - offset
+    *   - a ".snapshot" suffix
+    *
+    *  The snapshot files are located in the logDirectory, inside a 'pid-mapping' sub directory.
+    */
+  def maybeTakeSnapshot() {
+    // If not a new offset, then it is not worth taking another snapshot
+    if (lastMapOffset > lastSnapOffset) {
+      val file = new File(snapDir, formatFileName(lastMapOffset))
+      writeSnapshot(file, pidMap)
+
+      // Update the last snap offset according to the serialized map
+      lastSnapOffset = lastMapOffset
+
+      maybeRemove()
+    }
+  }
+
+  /**
+    * When we remove the head of the log due to retention, we need to
+    * clean up the id map. This method takes the new start offset and
+    * expires all ids that have a smaller offset.
+    *
+    * @param startOffset New start offset for the log associated to
+    *                    this id map instance
+    */
+  def cleanFrom(startOffset: Long) {
+    pidMap.retain((pid, entry) => entry.firstOffset >= startOffset)
+    if (pidMap.isEmpty)
+      lastMapOffset = -1L
+  }
+
+  private def maybeRemove() {
+    val list = listSnapshotFiles()
+    if (list.size > maxPidSnapshotsToRetain) {
+      // Get file with the smallest offset
+      val toDelete = list.minBy(offsetFromFile)
+      // Delete the last
+      toDelete.delete()
+    }
+  }
+
+  private def listSnapshotFiles(): List[File] = {
+    if (snapDir.exists && snapDir.isDirectory)
+      snapDir.listFiles.filter(f => f.isFile && verifyFileName(f.getName)).toList
+    else
+      List.empty[File]
+  }
+
+  /**
+   * Returns the last valid snapshot with offset smaller than the base offset provided as
+   * a constructor parameter for loading.
+   */
+  private def lastSnapshotFile(maxOffset: Long): Option[File] = {
+    val files = listSnapshotFiles()
+    if (files != null && files.nonEmpty) {
+      val targetOffset = files.foldLeft(0L) { (accOffset, file) =>
+        val snapshotLastOffset = offsetFromFile(file)
+        if ((maxOffset >= snapshotLastOffset) && (snapshotLastOffset > accOffset))
+          snapshotLastOffset
+        else
+          accOffset
+      }
+      val snap = new File(snapDir, formatFileName(targetOffset))
+      if (snap.exists)
+        Some(snap)
+      else
+        None
+    } else
+      None
+  }
+
+  private def truncateSnapshotFiles(maxOffset: Long) {
+    listSnapshotFiles().foreach { file =>
+      val snapshotLastOffset = offsetFromFile(file)
+      if (snapshotLastOffset >= maxOffset)
+        file.delete()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index defbf34..600b84d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.nio.ByteBuffer
-import java.lang.{Long => JLong, Short => JShort}
+import java.lang.{Long => JLong}
 import java.util.{Collections, Properties}
 import java.util
 
@@ -28,7 +28,7 @@ import kafka.cluster.Partition
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.common._
 import kafka.controller.KafkaController
-import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
+import kafka.coordinator.{GroupCoordinator, InitPidResult, JoinGroupResult, TransactionCoordinator}
 import kafka.log._
 import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
@@ -49,6 +49,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse
 
 import scala.collection._
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 /**
  * Logic to handle the various Kafka requests
@@ -56,7 +57,8 @@ import scala.collection.JavaConverters._
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val adminManager: AdminManager,
-                val coordinator: GroupCoordinator,
+                val groupCoordinator: GroupCoordinator,
+                val txnCoordinator: TransactionCoordinator,
                 val controller: KafkaController,
                 val zkUtils: ZkUtils,
                 val brokerId: Int,
@@ -100,6 +102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
+        case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -138,11 +141,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         // leadership changes
         updatedLeaders.foreach { partition =>
           if (partition.topic == Topic.GroupMetadataTopicName)
-            coordinator.handleGroupImmigration(partition.partitionId)
+            groupCoordinator.handleGroupImmigration(partition.partitionId)
         }
         updatedFollowers.foreach { partition =>
           if (partition.topic == Topic.GroupMetadataTopicName)
-            coordinator.handleGroupEmigration(partition.partitionId)
+            groupCoordinator.handleGroupEmigration(partition.partitionId)
         }
       }
 
@@ -181,7 +184,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // is not cleared.
         result.foreach { case (topicPartition, error) =>
           if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) {
-            coordinator.handleGroupEmigration(topicPartition.partition)
+            groupCoordinator.handleGroupEmigration(topicPartition.partition)
           }
         }
         new StopReplicaResponse(error, result.asJava)
@@ -202,7 +205,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
         val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
         if (deletedPartitions.nonEmpty)
-          coordinator.handleDeletedPartitions(deletedPartitions)
+          groupCoordinator.handleDeletedPartitions(deletedPartitions)
 
         if (adminManager.hasDelayedTopicOperations) {
           updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
@@ -305,7 +308,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val offsetRetention =
           if (header.apiVersion <= 1 ||
             offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
-            coordinator.offsetConfig.offsetsRetentionMs
+            groupCoordinator.offsetConfig.offsetsRetentionMs
           else
             offsetCommitRequest.retentionTime
 
@@ -332,7 +335,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         // call coordinator to handle commit offset
-        coordinator.handleCommitOffsets(
+        groupCoordinator.handleCommitOffsets(
           offsetCommitRequest.groupId,
           offsetCommitRequest.memberId,
           offsetCommitRequest.generationId,
@@ -792,7 +795,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         java.util.Collections.emptyList())
     } else {
       createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
-        config.offsetsTopicReplicationFactor.toInt, coordinator.offsetsTopicConfigs)
+        config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs)
     }
   }
 
@@ -946,7 +949,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else {
           // versions 1 and above read offsets from Kafka
           if (offsetFetchRequest.isAllPartitions) {
-            val (error, allPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId)
+            val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
             if (error != Errors.NONE)
               offsetFetchRequest.getErrorResponse(error)
             else {
@@ -957,7 +960,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           } else {
             val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
               .partition(authorizeTopicDescribe)
-            val (error, authorizedPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId,
+            val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
               Some(authorizedPartitions))
             if (error != Errors.NONE)
               offsetFetchRequest.getErrorResponse(error)
@@ -980,7 +983,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     } else {
-      val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
+      val partition = groupCoordinator.partitionFor(groupCoordinatorRequest.groupId)
 
       // get metadata (and create the topic if necessary)
       val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName)
@@ -1013,7 +1016,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
           groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
         } else {
-          val (error, summary) = coordinator.handleDescribeGroup(groupId)
+          val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
           val members = summary.members.map { member =>
             val metadata = ByteBuffer.wrap(member.metadata)
             val assignment = ByteBuffer.wrap(member.assignment)
@@ -1032,7 +1035,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
       ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
     } else {
-      val (error, groups) = coordinator.handleListGroups()
+      val (error, groups) = groupCoordinator.handleListGroups()
       val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
       new ListGroupsResponse(error, allGroups.asJava)
     }
@@ -1066,7 +1069,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // let the coordinator to handle join-group
       val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
         (protocol.name, Utils.toArray(protocol.metadata))).toList
-      coordinator.handleJoinGroup(
+      groupCoordinator.handleJoinGroup(
         joinGroupRequest.groupId,
         joinGroupRequest.memberId,
         request.header.clientId,
@@ -1090,7 +1093,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
       sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
     } else {
-      coordinator.handleSyncGroup(
+      groupCoordinator.handleSyncGroup(
         syncGroupRequest.groupId(),
         syncGroupRequest.generationId(),
         syncGroupRequest.memberId(),
@@ -1117,7 +1120,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
     else {
       // let the coordinator to handle heartbeat
-      coordinator.handleHeartbeat(
+      groupCoordinator.handleHeartbeat(
         heartbeatRequest.groupId(),
         heartbeatRequest.memberId(),
         heartbeatRequest.groupGenerationId(),
@@ -1141,7 +1144,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new Response(request, leaveGroupResponse))
     } else {
       // let the coordinator to handle leave-group
-      coordinator.handleLeaveGroup(
+      groupCoordinator.handleLeaveGroup(
         leaveGroupRequest.groupId(),
         leaveGroupRequest.memberId(),
         sendResponseCallback)
@@ -1308,6 +1311,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleInitPidRequest(request: RequestChannel.Request): Unit = {
+    val initPidRequest = request.body[InitPidRequest]
+    // Send response callback
+    def sendResponseCallback(result: InitPidResult): Unit = {
+      val responseBody: InitPidResponse = new InitPidResponse(result.error, result.pid, result.epoch)
+      trace(s"InitPidRequest : Generated new PID ${result.pid} from InitPidRequest from client ${request.header.clientId}")
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+    }
+    txnCoordinator.handleInitPid(initPidRequest.transactionalId, sendResponseCallback)
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fe6631e..0f2205f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,8 +33,8 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
 
-import scala.collection.Map
 import scala.collection.JavaConverters._
+import scala.collection.Map
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -163,10 +163,14 @@ object Defaults {
   val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples
   val ReplicationQuotaWindowSizeSeconds: Int = ReplicationQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
 
+  /** ********* Transaction Configuration ***********/
+  val TransactionalIdExpirationMsDefault = 604800000
+
   val DeleteTopicEnable = false
 
   val CompressionType = "producer"
 
+  val MaxIdMapSnapshots = 2
   /** ********* Kafka Metrics Configuration ***********/
   val MetricNumSamples = 2
   val MetricSampleWindowMs = 30000
@@ -194,7 +198,6 @@ object Defaults {
   val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
   val SaslKerberosPrincipalToLocalRules = SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
-
 }
 
 object KafkaConfig {
@@ -280,6 +283,7 @@ object KafkaConfig {
   val LogMessageFormatVersionProp = LogConfigPrefix + "message.format.version"
   val LogMessageTimestampTypeProp = LogConfigPrefix + "message.timestamp.type"
   val LogMessageTimestampDifferenceMaxMsProp = LogConfigPrefix + "message.timestamp.difference.max.ms"
+  val LogMaxIdMapSnapshotsProp = LogConfigPrefix + "max.id.map.snapshots"
   val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
   val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
@@ -332,6 +336,8 @@ object KafkaConfig {
   val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
   val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds"
+  /** ********* Transaction Configuration **********/
+  val TransactionalIdExpirationMsProp = "transactional.id.expiration.ms"
 
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
@@ -568,6 +574,11 @@ object KafkaConfig {
   val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
   val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"
   val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
+  /** ********* Transaction Configuration ***********/
+  val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " +
+    "transaction coordinator. Note that this also influences PID expiration: PIDs are guaranteed to expire " +
+    "after expiration of this timeout from the last write by the PID (they may expire sooner if the last write " +
+    "from the PID is deleted due to the topic's retention settings)."
 
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
   val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +
@@ -763,6 +774,9 @@ object KafkaConfig {
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
       .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc)
 
+      /** ********* Transaction configuration ***********/
+      .define(TransactionalIdExpirationMsProp, INT, Defaults.TransactionalIdExpirationMsDefault, atLeast(1), LOW, TransactionIdExpirationMsDoc)
+
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
       .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc)
@@ -989,6 +1003,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp)
   val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
 
+  /** ********* Transaction Configuration **************/
+  val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
+
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
   val compressionType = getString(KafkaConfig.CompressionTypeProp)
   val listeners: Seq[EndPoint] = getListeners

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d3e49c..e63a6d2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -29,7 +29,7 @@ import kafka.api.KAFKA_0_9_0
 import kafka.cluster.Broker
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
 import kafka.controller.{ControllerStats, KafkaController}
-import kafka.coordinator.GroupCoordinator
+import kafka.coordinator.{GroupCoordinator, TransactionCoordinator}
 import kafka.log.{CleanerConfig, LogConfig, LogManager}
 import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
 import kafka.network.{BlockingChannel, SocketServer}
@@ -37,7 +37,7 @@ import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
@@ -122,6 +122,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   var groupCoordinator: GroupCoordinator = null
 
+  var transactionCoordinator: TransactionCoordinator = null
+
   var kafkaController: KafkaController = null
 
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
@@ -205,7 +207,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
 
         /* start log manager */
-        logManager = createLogManager(zkUtils.zkClient, brokerState)
+        logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time)
         logManager.startup()
 
         metadataCache = new MetadataCache(config.brokerId)
@@ -229,6 +231,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
         groupCoordinator.startup()
 
+        /* start transaction coordinator */
+        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
+        transactionCoordinator = TransactionCoordinator(config, zkUtils, Time.SYSTEM)
+        transactionCoordinator.startup()
+
         /* Get the authorizer and initialize it if one is specified.*/
         authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
           val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
@@ -237,9 +244,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         }
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
-          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
-          clusterId, time)
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId, time)
 
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
           config.numIoThreads)
@@ -403,8 +409,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         while (!shutdownSucceeded && remainingRetries > 0) {
           remainingRetries = remainingRetries - 1
 
-          import NetworkClientBlockingOps._
-
           // 1. Find the controller and establish a connection to it.
 
           // Get the current controller info. This is to ensure we use the most recent info to issue the
@@ -431,14 +435,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           if (prevController != null) {
             try {
 
-              if (!networkClient.blockingReady(node(prevController), socketTimeoutMs)(time))
+              if (!NetworkClientUtils.awaitReady(networkClient, node(prevController), time, socketTimeoutMs))
                 throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
               // send the controlled shutdown request
               val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId)
               val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
                 time.milliseconds(), true)
-              val clientResponse = networkClient.blockingSendAndReceive(request)(time)
+              val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)
 
               val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
               if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining.isEmpty) {
@@ -633,36 +637,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
 
-  private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
-    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
-    val defaultLogConfig = LogConfig(defaultProps)
-
-    val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
-      topic -> LogConfig.fromProps(defaultProps, configs)
-    }
-    // read the log configurations from zookeeper
-    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
-                                      dedupeBufferSize = config.logCleanerDedupeBufferSize,
-                                      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
-                                      ioBufferSize = config.logCleanerIoBufferSize,
-                                      maxMessageSize = config.messageMaxBytes,
-                                      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
-                                      backOffMs = config.logCleanerBackoffMs,
-                                      enableCleaner = config.logCleanerEnable)
-    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
-                   topicConfigs = configs,
-                   defaultConfig = defaultLogConfig,
-                   cleanerConfig = cleanerConfig,
-                   ioThreads = config.numRecoveryThreadsPerDataDir,
-                   flushCheckMs = config.logFlushSchedulerIntervalMs,
-                   flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
-                   flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
-                   retentionCheckMs = config.logCleanupIntervalMs,
-                   scheduler = kafkaScheduler,
-                   brokerState = brokerState,
-                   time = time)
-  }
-
   /**
     * Generates new brokerId if enabled or reads from meta.properties based on following conditions
     * <ol>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5f055a6..cce59ce 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -27,7 +27,7 @@ import kafka.api.{FetchRequest => _, _}
 import kafka.common.KafkaStorageException
 import ReplicaFetcherThread._
 import kafka.utils.Exit
-import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
@@ -248,14 +248,13 @@ class ReplicaFetcherThread(name: String,
   }
 
   private def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse = {
-    import kafka.utils.NetworkClientBlockingOps._
     try {
-      if (!networkClient.blockingReady(sourceNode, socketTimeout)(time))
+      if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
         throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
       else {
         val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder,
           time.milliseconds(), true)
-        networkClient.blockingSendAndReceive(clientRequest)(time)
+        NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
       }
     }
     catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
deleted file mode 100644
index 0370564..0000000
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.io.IOException
-
-import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient}
-import org.apache.kafka.common.Node
-import org.apache.kafka.common.requests.AbstractRequest
-import org.apache.kafka.common.utils.Time
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-
-object NetworkClientBlockingOps {
-  implicit def networkClientBlockingOps(client: NetworkClient): NetworkClientBlockingOps =
-    new NetworkClientBlockingOps(client)
-}
-
-/**
- * Provides extension methods for `NetworkClient` that are useful for implementing blocking behaviour. Use with care.
- *
- * Example usage:
- *
- * {{{
- * val networkClient: NetworkClient = ...
- * import NetworkClientBlockingOps._
- * networkClient.blockingReady(...)
- * }}}
- */
-class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
-
-  /**
-    * Checks whether the node is currently connected, first calling `client.poll` to ensure that any pending
-    * disconnects have been processed.
-    *
-    * This method can be used to check the status of a connection prior to calling `blockingReady` to be able
-    * to tell whether the latter completed a new connection.
-    */
-  def isReady(node: Node)(implicit time: Time): Boolean = {
-    val currentTime = time.milliseconds()
-    client.poll(0, currentTime)
-    client.isReady(node, currentTime)
-  }
-
-  /**
-   * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
-   * invocations until the connection to `node` is ready, the timeout expires or the connection fails.
-   *
-   * It returns `true` if the call completes normally or `false` if the timeout expires. If the connection fails,
-   * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive
-   * connection timeout, it is possible for this method to raise an `IOException` for a previous connection which
-   * has recently disconnected.
-   *
-   * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
-   * care.
-   */
-  def blockingReady(node: Node, timeout: Long)(implicit time: Time): Boolean = {
-    require(timeout >=0, "timeout should be >= 0")
-
-    val startTime = time.milliseconds()
-    val expiryTime = startTime + timeout
-
-    @tailrec
-    def awaitReady(iterationStartTime: Long): Boolean = {
-      if (client.isReady(node, iterationStartTime))
-        true
-      else if (client.connectionFailed(node))
-        throw new IOException(s"Connection to $node failed")
-      else {
-        val pollTimeout = expiryTime - iterationStartTime
-        client.poll(pollTimeout, iterationStartTime)
-        val afterPollTime = time.milliseconds()
-        if (afterPollTime < expiryTime) awaitReady(afterPollTime)
-        else false
-      }
-    }
-
-    isReady(node) || client.ready(node, startTime) || awaitReady(startTime)
-  }
-
-  /**
-   * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received or a
-   * disconnection happens (which can happen for a number of reasons including a request timeout).
-   *
-   * In case of a disconnection, an `IOException` is thrown.
-   *
-   * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
-   * care.
-   */
-  def blockingSendAndReceive(request: ClientRequest)(implicit time: Time): ClientResponse = {
-    client.send(request, time.milliseconds())
-    pollContinuously { responses =>
-      val response = responses.find { response =>
-        response.requestHeader.correlationId == request.correlationId
-      }
-      response.foreach { r =>
-        if (r.wasDisconnected)
-          throw new IOException(s"Connection to ${request.destination} was disconnected before the response was read")
-        else if (r.versionMismatch() != null)
-          throw r.versionMismatch();
-      }
-      response
-    }
-  }
-
-  /**
-    * Invokes `client.poll` until `collect` returns `Some`. The value inside `Some` is returned.
-    *
-    * Exceptions thrown via `collect` are not handled and will bubble up.
-    *
-    * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
-    * care.
-    */
-  private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: Time): T = {
-
-    @tailrec
-    def recursivePoll: T = {
-      // rely on request timeout to ensure we don't block forever
-      val responses = client.poll(Long.MaxValue, time.milliseconds()).asScala
-      collect(responses) match {
-        case Some(result) => result
-        case None => recursivePoll
-      }
-    }
-
-    recursivePoll
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index aa55479..6ff5c5f 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -31,9 +31,6 @@ import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMars
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
@@ -65,6 +62,7 @@ object ZkUtils {
   val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election"
   val BrokerSequenceIdPath = s"$BrokersPath/seqid"
   val ConfigChangesPath = s"$ConfigPath/changes"
+  val PidBlockPath = "/latest_pid_block"
 
 
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
@@ -76,7 +74,8 @@ object ZkUtils {
                               ControllerEpochPath,
                               IsrChangeNotificationPath,
                               KafkaAclPath,
-                              KafkaAclChangesPath)
+                              KafkaAclChangesPath,
+                              PidBlockPath)
 
   def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
     val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
@@ -217,7 +216,8 @@ class ZkUtils(val zkClient: ZkClient,
                               getEntityConfigRootPath(ConfigType.Client),
                               DeleteTopicsPath,
                               BrokerSequenceIdPath,
-                              IsrChangeNotificationPath)
+                              IsrChangeNotificationPath,
+                              PidBlockPath)
 
   val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
 
@@ -529,12 +529,12 @@ class ZkUtils(val zkClient: ZkClient,
           case Some(checker) => checker(this, path, data)
           case _ =>
             debug("Checker method is not passed skipping zkData match")
-            warn("Conditional update of path %s with data %s and expected version %d failed due to %s"
+            debug("Conditional update of path %s with data %s and expected version %d failed due to %s"
               .format(path, data,expectVersion, e1.getMessage))
             (false, -1)
         }
       case e2: Exception =>
-        warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
+        debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
           expectVersion, e2.getMessage))
         (false, -1)
     }
@@ -624,6 +624,20 @@ class ZkUtils(val zkClient: ZkClient,
     dataAndStat
   }
 
+  def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
+    val stat = new Stat()
+    try {
+      val data: String = zkClient.readData(path, stat)
+      if (data == null.asInstanceOf[String])
+        (None, stat.getVersion)
+      else
+      (Some(data), stat.getVersion)
+    } catch {
+      case _: ZkNoNodeException =>
+        (None, stat.getVersion)
+    }
+  }
+
   def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
 
   def getChildrenParentMayNotExist(path: String): Seq[String] = {
@@ -719,6 +733,14 @@ class ZkUtils(val zkClient: ZkClient,
     }
   }
 
+  def getTopicPartitionCount(topic: String): Option[Int] = {
+    val topicData = getPartitionAssignmentForTopics(Seq(topic))
+    if (topicData(topic).nonEmpty)
+      Some(topicData(topic).size)
+    else
+      None
+  }
+
   def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext] = {
     // read the partitions and their new replica list
     val jsonPartitionMapOpt = readDataMaybeNull(ReassignPartitionsPath)._1

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 852377c..5aeeefe 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -14,6 +14,7 @@
 package kafka.api
 
 import java.util.Properties
+import java.util.concurrent.Future
 
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
@@ -24,11 +25,13 @@ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
+import scala.collection.mutable.ArrayBuffer
+
 class ProducerBounceTest extends KafkaServerTestHarness {
-  private val producerBufferSize = 30000
+  private val producerBufferSize =  65536
   private val serverMessageMaxBytes =  producerBufferSize/2
 
-  val numServers = 2
+  val numServers = 4
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
@@ -36,7 +39,9 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
   // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
-
+  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
+  overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
+  overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
   // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
   // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
   // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
@@ -47,31 +52,19 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
   // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
   override def generateConfigs() = {
-    FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = false)
+    FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
-  private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
-  private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
-  private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
-
   private val topic1 = "topic-1"
 
   @Before
   override def setUp() {
     super.setUp()
-
-    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, bufferSize = producerBufferSize)
-    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, bufferSize = producerBufferSize)
-    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, bufferSize = producerBufferSize)
   }
 
   @After
   override def tearDown() {
-    if (producer1 != null) producer1.close
-    if (producer2 != null) producer2.close
-    if (producer3 != null) producer3.close
-
     super.tearDown()
   }
 
@@ -81,19 +74,25 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   @Test
   def testBrokerFailure() {
     val numPartitions = 3
-    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers)
+    val topicConfig = new Properties();
+    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
+
     assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
 
     val scheduler = new ProducerScheduler()
     scheduler.start
 
     // rolling bounce brokers
+
     for (_ <- 0 until numServers) {
       for (server <- servers) {
+        info("Shutting down server : %s".format(server.config.brokerId))
         server.shutdown()
         server.awaitShutdown()
+        info("Server %s shut down. Starting it up again.".format(server.config.brokerId))
         server.startup()
-        Thread.sleep(2000)
+        info("Restarted server: %s".format(server.config.brokerId))
       }
 
       // Make sure the producer do not see any exception in returned metadata due to broker failures
@@ -121,8 +120,9 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     val messages = fetchResponses.flatMap(r => r.iterator.toList.map(_.message))
     val uniqueMessages = messages.toSet
     val uniqueMessageSize = uniqueMessages.size
-
-    assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
+    info(s"number of unique messages sent: ${uniqueMessageSize}")
+    assertEquals(s"Found ${messages.size - uniqueMessageSize} duplicate messages.", uniqueMessageSize, messages.size)
+    assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, messages.size)
   }
 
   private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) {
@@ -130,26 +130,51 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     var sent = 0
     var failed = false
 
-    val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10)
+    val producerConfig = new Properties()
+    producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    val producerConfigWithCompression = new Properties()
+    producerConfigWithCompression.putAll(producerConfig)
+    producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
+    val producers = List(
+      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 4, retries = 10, props = Some(producerConfig)),
+      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 2, retries = 10, lingerMs = 5000, props = Some(producerConfig)),
+      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10, lingerMs = 10000, props = Some(producerConfigWithCompression))
+    )
 
     override def doWork(): Unit = {
-      val responses =
-        for (i <- sent+1 to sent+numRecords)
-        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes),
-                            new ErrorLoggingCallback(topic1, null, null, true))
-      val futures = responses.toList
+      info("Starting to send messages..")
+      var producerId = 0
+      val responses = new ArrayBuffer[IndexedSeq[Future[RecordMetadata]]]()
+      for (producer <- producers) {
+        val response =
+          for (i <- sent+1 to sent+numRecords)
+            yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, ((producerId + 1) * i).toString.getBytes),
+              new ErrorLoggingCallback(topic1, null, null, true))
+        responses.append(response)
+        producerId += 1
+      }
 
       try {
-        futures.map(_.get)
-        sent += numRecords
+        for (response <- responses) {
+          val futures = response.toList
+          futures.map(_.get)
+          sent += numRecords
+        }
+        info(s"Sent $sent records")
       } catch {
-        case _ : Exception => failed = true
+        case e : Exception =>
+          error(s"Got exception ${e.getMessage}")
+          e.printStackTrace()
+          failed = true
       }
     }
 
     override def shutdown(){
       super.shutdown()
-      producer.close
+      for (producer <- producers) {
+        producer.close()
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 8a198eb..61199c2 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -79,14 +79,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
     props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
 
-    // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
-    ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
-
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret)
+    // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
+    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2))
     EasyMock.replay(zkUtils)
 
     timer = new MockTimer

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 6b1abf3..9d38485 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -69,11 +69,8 @@ class GroupMetadataManagerTest {
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
 
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
-    ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
-
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret)
+    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2))
     EasyMock.replay(zkUtils)
 
     time = new MockTime

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
new file mode 100644
index 0000000..da9ec47
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
@@ -0,0 +1,105 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.coordinator
+
+import kafka.common.KafkaException
+import kafka.utils.ZkUtils
+import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.{After, Test}
+import org.junit.Assert._
+
+class ProducerIdManagerTest {
+
+  val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+
+  @After
+  def tearDown(): Unit = {
+    EasyMock.reset(zkUtils)
+  }
+
+  @Test
+  def testGetPID() {
+    var zkVersion: Int = -1
+    var data: String = null
+    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
+      .andAnswer(new IAnswer[(Option[String], Int)] {
+        override def answer(): (Option[String], Int) = {
+          if (zkVersion == -1) {
+            (None.asInstanceOf[Option[String]], 0)
+          } else {
+            (Some(data), zkVersion)
+          }
+        }
+      })
+      .anyTimes()
+
+    val capturedVersion: Capture[Int] = EasyMock.newCapture()
+    val capturedData: Capture[String] = EasyMock.newCapture()
+    EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
+      EasyMock.capture(capturedData),
+      EasyMock.capture(capturedVersion),
+      EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]]))
+      .andAnswer(new IAnswer[(Boolean, Int)] {
+        override def answer(): (Boolean, Int) = {
+          zkVersion = capturedVersion.getValue + 1
+          data = capturedData.getValue
+
+          (true, zkVersion)
+        }
+      })
+      .anyTimes()
+
+    EasyMock.replay(zkUtils)
+
+    val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils)
+    val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils)
+
+    val pid1 = manager1.nextPid()
+    val pid2 = manager2.nextPid()
+
+    assertEquals(0, pid1)
+    assertEquals(ProducerIdManager.PidBlockSize, pid2)
+
+    for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
+      assertEquals(pid1 + i, manager1.nextPid())
+    }
+
+    for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
+      assertEquals(pid2 + i, manager2.nextPid())
+    }
+
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid())
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid())
+  }
+
+  @Test(expected = classOf[KafkaException])
+  def testExceedPIDLimit() {
+    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
+      .andAnswer(new IAnswer[(Option[String], Int)] {
+        override def answer(): (Option[String], Int) = {
+          (Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0,
+            Long.MaxValue - ProducerIdManager.PidBlockSize,
+            Long.MaxValue))), 0)
+        }
+      })
+      .anyTimes()
+    EasyMock.replay(zkUtils)
+    new ProducerIdManager(0, zkUtils)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
new file mode 100644
index 0000000..f8ef5dc
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
@@ -0,0 +1,93 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.coordinator
+
+import kafka.utils.ZkUtils
+import org.apache.kafka.common.protocol.Errors
+import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+class TransactionCoordinatorTest {
+
+  val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+
+  var zkVersion: Int = -1
+  var data: String = null
+  val capturedVersion: Capture[Int] = EasyMock.newCapture()
+  val capturedData: Capture[String] = EasyMock.newCapture()
+  EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
+    .andAnswer(new IAnswer[(Option[String], Int)] {
+      override def answer(): (Option[String], Int) = {
+        if (zkVersion == -1) {
+          (None.asInstanceOf[Option[String]], 0)
+        } else {
+          (Some(data), zkVersion)
+        }
+      }
+    })
+    .anyTimes()
+
+  EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
+    EasyMock.capture(capturedData),
+    EasyMock.capture(capturedVersion),
+    EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]]))
+    .andAnswer(new IAnswer[(Boolean, Int)] {
+      override def answer(): (Boolean, Int) = {
+        zkVersion = capturedVersion.getValue + 1
+        data = capturedData.getValue
+
+        (true, zkVersion)
+      }
+    })
+    .anyTimes()
+
+  EasyMock.replay(zkUtils)
+
+  val pidManager: ProducerIdManager = new ProducerIdManager(0, zkUtils)
+  val coordinator: TransactionCoordinator = new TransactionCoordinator(0, pidManager)
+
+  var result: InitPidResult = null
+
+  @Before
+  def setUp(): Unit = {
+    coordinator.startup()
+  }
+
+  @After
+  def tearDown(): Unit = {
+    EasyMock.reset(zkUtils)
+    coordinator.shutdown()
+  }
+
+  @Test
+  def testHandleInitPid() = {
+    coordinator.handleInitPid("", initPidMockCallback)
+    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
+
+    coordinator.handleInitPid("", initPidMockCallback)
+    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
+
+    coordinator.handleInitPid(null, initPidMockCallback)
+    assertEquals(InitPidResult(2L, 0, Errors.NONE), result)
+  }
+
+  def initPidMockCallback(ret: InitPidResult): Unit = {
+    result = ret
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 5f97708..49faa85 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -53,7 +53,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
     val logProps = new Properties()
     logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
     /*configure broker-side compression  */
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     /* append two messages */
     log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 3e91f96..2104842 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -322,7 +322,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       val dir = new File(logDir, "log-" + i)
       dir.mkdirs()
 
-      val log = new Log(dir = dir,
+      val log = new Log(dir,
                         LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)),
                         logStartOffset = 0L,
                         recoveryPoint = 0L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 05d9060..2cfcc07 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -149,7 +149,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
       logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
       logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
 
-      val log = new Log(dir = dir,
+      val log = new Log(dir,
         LogConfig(logProps),
         logStartOffset = 0L,
         recoveryPoint = 0L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 94207ec..e933c87 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -185,8 +185,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     log
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 38eb94c..928b03d 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -174,6 +174,27 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testLogCleanerRetainsLastWrittenRecordForEachPid(): Unit = {
+    val cleaner = makeCleaner(10)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    log.append(record(0, 0)) // offset 0
+    log.append(record(0, 1, pid = 1, epoch = 0, sequence = 0)) // offset 1
+    log.append(record(0, 2, pid = 2, epoch = 0, sequence = 0)) // offset 2
+    log.append(record(0, 3, pid = 3, epoch = 0, sequence = 0)) // offset 3
+    log.append(record(1, 1, pid = 2, epoch = 0, sequence = 1)) // offset 4
+
+    // roll the segment, so we can clean the messages already appended
+    log.roll()
+
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
+    assertEquals(immutable.List(0, 0, 1), keysInLog(log))
+    assertEquals(immutable.List(1, 3, 4), offsetsInLog(log))
+  }
+
+  @Test
   def testPartialSegmentClean(): Unit = {
     // because loadFactor is 0.75, this means we can fit 2 messages in the map
     val cleaner = makeCleaner(2)
@@ -796,8 +817,12 @@ class LogCleanerTest extends JUnitSuite {
 
   def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
 
-  def record(key: Int, value: Int): MemoryRecords =
-    record(key, value.toString.getBytes)
+
+  def record(key: Int, value: Int, pid: Long = RecordBatch.NO_PRODUCER_ID, epoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
+             sequence: Int = RecordBatch.NO_SEQUENCE): MemoryRecords = {
+    MemoryRecords.withRecords(0L, CompressionType.NONE, pid, epoch, sequence,
+      new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
+  }
 
   def record(key: Int, value: Array[Byte]) =
     TestUtils.singletonRecords(key = key.toString.getBytes, value = value)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a8e953a..1400615 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -102,7 +102,7 @@ class LogManagerTest {
     time.sleep(maxLogAgeMs + 1)
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes)
 
     try {
@@ -148,7 +148,7 @@ class LogManagerTest {
     time.sleep(logManager.InitialTaskDelayMs)
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes)
     try {
       log.read(0, 1024)


[3/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index ab81bfe..d238093 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -56,14 +56,14 @@ public class MemoryRecordsBuilder {
     private final int initPos;
     private final long baseOffset;
     private final long logAppendTime;
-    private final long producerId;
-    private final short producerEpoch;
-    private final int baseSequence;
     private final boolean isTransactional;
     private final int partitionLeaderEpoch;
     private final int writeLimit;
     private final int initialCapacity;
 
+    private long producerId;
+    private short producerEpoch;
+    private int baseSequence;
     private long writtenUncompressed = 0;
     private int numRecords = 0;
     private float compressionRate = 1;
@@ -193,6 +193,19 @@ public class MemoryRecordsBuilder {
             return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
     }
 
+    public void setProducerState(long pid, short epoch, int baseSequence) {
+        if (isClosed()) {
+            // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
+            // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
+            // be re queued. In this case, we should not attempt to set the state again, since changing the pid and sequence
+            // once a batch has been sent to the broker risks introducing duplicates.
+            throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
+        }
+        this.producerId = pid;
+        this.producerEpoch = epoch;
+        this.baseSequence = baseSequence;
+    }
+
     public void close() {
         if (builtRecords != null)
             return;
@@ -577,4 +590,11 @@ public class MemoryRecordsBuilder {
             this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
         }
     }
+
+    /**
+     * Return the ProducerId (PID) of the RecordBatches created by this builder.
+     */
+    public long producerId() {
+        return this.producerId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 90f1486..ae4a225 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -146,6 +146,11 @@ public interface RecordBatch extends Iterable<Record> {
     short producerEpoch();
 
     /**
+     * Does the batch have a valid producer id set.
+     */
+    boolean hasProducerId();
+
+    /**
      * Get the first sequence number of this record batch.
      * @return The first sequence number or -1 if there is none
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 3a99a8a..1638556 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -171,6 +171,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case DELETE_RECORDS:
                 request = new DeleteRecordsRequest(struct, version);
                 break;
+            case INIT_PRODUCER_ID:
+                request = new InitPidRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index a5d0dc4..314aa42 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -93,6 +93,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DeleteTopicsResponse(struct);
             case DELETE_RECORDS:
                 return new DeleteRecordsResponse(struct);
+            case INIT_PRODUCER_ID:
+                return new InitPidResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
new file mode 100644
index 0000000..284107f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class InitPidRequest extends AbstractRequest {
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+
+    private final String transactionalId;
+
+    public static class Builder extends AbstractRequest.Builder<InitPidRequest> {
+        private final String transactionalId;
+        public Builder(String transactionalId) {
+            super(ApiKeys.INIT_PRODUCER_ID);
+            if (transactionalId != null && transactionalId.isEmpty())
+                throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+            this.transactionalId = transactionalId;
+        }
+
+        @Override
+        public InitPidRequest build(short version) {
+            return new InitPidRequest(this.transactionalId, version);
+        }
+
+        @Override
+        public String toString() {
+            return "(type=InitPidRequest)";
+        }
+
+    }
+
+    public InitPidRequest(Struct struct, short version) {
+        super(version);
+        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+    }
+
+    private InitPidRequest(String transactionalId, short version) {
+        super(version);
+        this.transactionalId = transactionalId;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(Throwable e) {
+        return new InitPidResponse(Errors.forException(e));
+    }
+
+    public static InitPidRequest parse(ByteBuffer buffer, short version) {
+        return new InitPidRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
+    }
+
+    public String transactionalId() {
+        return transactionalId;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
+        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        return struct;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
new file mode 100644
index 0000000..ee92375
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.nio.ByteBuffer;
+
+public class InitPidResponse extends AbstractResponse {
+    /**
+     * Possible Error codes:
+     * OK
+     *
+     */
+    private static final String PRODUCER_ID_KEY_NAME = "pid";
+    private static final String EPOCH_KEY_NAME = "epoch";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private final Errors error;
+    private final long producerId;
+    private final short epoch;
+
+    public InitPidResponse(Errors error, long producerId, short epoch) {
+        this.error = error;
+        this.producerId = producerId;
+        this.epoch = epoch;
+    }
+
+    public InitPidResponse(Struct struct) {
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+        this.epoch = struct.getShort(EPOCH_KEY_NAME);
+    }
+
+    public InitPidResponse(Errors errors) {
+        this(errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public short epoch() {
+        return epoch;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
+        struct.set(PRODUCER_ID_KEY_NAME, producerId);
+        struct.set(EPOCH_KEY_NAME, epoch);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
+
+    public static InitPidResponse parse(ByteBuffer buffer, short version) {
+        return new InitPidResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
index 50c90a8..6efe311 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
@@ -31,6 +31,16 @@ public final class ByteUtils {
     private ByteUtils() {}
 
     /**
+     * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
+     *
+     * @param buffer The buffer to read from
+     * @return The integer read, as a long to avoid signedness
+     */
+    public static long readUnsignedInt(ByteBuffer buffer) {
+        return buffer.getInt() & 0xffffffffL;
+    }
+
+    /**
      * Read an unsigned integer from the given position without modifying the buffers position
      *
      * @param buffer the buffer to read from

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 9cc863b..9117e16 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -17,18 +17,23 @@
 package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.After;
@@ -89,7 +94,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize,
-                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             // append to the first batch
@@ -108,7 +113,6 @@ public class RecordAccumulatorTest {
         Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
         assertEquals(2, partitionBatches.size());
         Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator();
-        assertFalse(partitionBatchesIterator.next().isWritable());
         assertTrue(partitionBatchesIterator.next().isWritable());
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -129,7 +133,7 @@ public class RecordAccumulatorTest {
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
         accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
@@ -138,7 +142,7 @@ public class RecordAccumulatorTest {
     public void testLinger() throws Exception {
         long lingerMs = 10L;
         RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
@@ -157,7 +161,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testPartialDrain() throws Exception {
         RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
@@ -177,7 +181,7 @@ public class RecordAccumulatorTest {
         final int msgs = 10000;
         final int numParts = 2;
         final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -222,7 +226,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         // Just short of going over the limit so we trigger linger time
         int appends = expectedNumAppends(batchSize);
 
@@ -257,7 +261,7 @@ public class RecordAccumulatorTest {
         long lingerMs = Long.MAX_VALUE / 4;
         long retryBackoffMs = Long.MAX_VALUE / 2;
         final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
 
         long now = time.milliseconds();
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
@@ -295,7 +299,7 @@ public class RecordAccumulatorTest {
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
         final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         for (int i = 0; i < 100; i++)
             accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@@ -329,7 +333,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testAwaitFlushComplete() throws Exception {
         RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions(), null);
         accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs);
 
         accum.beginFlush();
@@ -349,7 +353,7 @@ public class RecordAccumulatorTest {
         long lingerMs = Long.MAX_VALUE;
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
         final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -378,7 +382,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
         int appends = expectedNumAppends(batchSize);
 
         // Test batches not in retry
@@ -449,7 +453,7 @@ public class RecordAccumulatorTest {
         int messagesPerBatch = expectedNumAppends(1024);
 
         final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
         final AtomicInteger expiryCallbackCount = new AtomicInteger();
         final AtomicReference<Exception> unexpectedException = new AtomicReference<>();
         Callback callback = new Callback() {
@@ -490,7 +494,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions(), null);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
@@ -519,6 +523,18 @@ public class RecordAccumulatorTest {
         assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0);
     }
 
+    @Test(expected = UnsupportedVersionException.class)
+    public void testIdempotenceWithOldMagic() throws InterruptedException {
+        // Simulate talking to an older broker, ie. one which supports a lower magic.
+        ApiVersions apiVersions = new ApiVersions();
+        int batchSize = 1025;
+        apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id,
+                (short) 0, (short) 2))));
+        RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
+                CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionState(time));
+        accum.append(tp1, 0L, key, value, null, 0);
+    }
+
     /**
      * Return the offset delta.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 0dea6b6..0d19aa0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
@@ -32,11 +33,14 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.InitPidRequest;
+import org.apache.kafka.common.requests.InitPidResponse;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -46,6 +50,7 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -81,24 +86,7 @@ public class SenderTest {
 
     @Before
     public void setup() {
-        Map<String, String> metricTags = new LinkedHashMap<>();
-        metricTags.put("client-id", CLIENT_ID);
-        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
-        metrics = new Metrics(metricConfig, time);
-        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions);
-        sender = new Sender(client,
-                            metadata,
-                            this.accumulator,
-                            true,
-                            MAX_REQUEST_SIZE,
-                            ACKS_ALL,
-                            MAX_RETRIES,
-                            metrics,
-                            time,
-                            REQUEST_TIMEOUT,
-                            apiVersions);
-
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        setupWithTransactionState(null);
     }
 
     @After
@@ -244,16 +232,19 @@ public class SenderTest {
         Metrics m = new Metrics();
         try {
             Sender sender = new Sender(client,
-                                       metadata,
-                                       this.accumulator,
-                                       false,
-                                       MAX_REQUEST_SIZE,
-                                       ACKS_ALL,
-                                       maxRetries,
-                                       m,
-                                       time,
-                                       REQUEST_TIMEOUT,
-                                       apiVersions);
+                    metadata,
+                    this.accumulator,
+                    false,
+                    MAX_REQUEST_SIZE,
+                    ACKS_ALL,
+                    maxRetries,
+                    m,
+                    time,
+                    REQUEST_TIMEOUT,
+                    50,
+                    null,
+                    apiVersions
+            );
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
@@ -300,17 +291,19 @@ public class SenderTest {
         Metrics m = new Metrics();
         try {
             Sender sender = new Sender(client,
-                metadata,
-                this.accumulator,
-                true,
-                MAX_REQUEST_SIZE,
-                ACKS_ALL,
-                maxRetries,
-                m,
-                time,
-                REQUEST_TIMEOUT,
-                apiVersions);
-
+                    metadata,
+                    this.accumulator,
+                    true,
+                    MAX_REQUEST_SIZE,
+                    ACKS_ALL,
+                    maxRetries,
+                    m,
+                    time,
+                    REQUEST_TIMEOUT,
+                    50,
+                    null,
+                    apiVersions
+            );
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@@ -381,6 +374,164 @@ public class SenderTest {
         assertTrue("Request should be completed", future.isDone());
     }
 
+    @Test
+    public void testInitPidRequest() throws Exception {
+        final long producerId = 343434L;
+        TransactionState transactionState = new TransactionState(new MockTime());
+        setupWithTransactionState(transactionState);
+        client.setNode(new Node(1, "localhost", 33343));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                return body instanceof InitPidRequest;
+            }
+        }, new InitPidResponse(Errors.NONE, producerId, (short) 0));
+        sender.run(time.milliseconds());
+        assertTrue(transactionState.hasPid());
+        assertEquals(producerId, transactionState.pidAndEpoch().producerId);
+        assertEquals((short) 0, transactionState.pidAndEpoch().epoch);
+    }
+
+    @Test
+    public void testSequenceNumberIncrement() throws InterruptedException {
+        final long producerId = 343434L;
+        TransactionState transactionState = new TransactionState(new MockTime());
+        transactionState.setPidAndEpoch(producerId, (short) 0);
+        setupWithTransactionState(transactionState);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        Sender sender = new Sender(client,
+                metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                maxRetries,
+                m,
+                time,
+                REQUEST_TIMEOUT,
+                50,
+                transactionState,
+                apiVersions
+        );
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                if (body instanceof ProduceRequest) {
+                    ProduceRequest request = (ProduceRequest) body;
+                    MemoryRecords records = request.partitionRecordsOrFail().get(tp0);
+                    Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
+                    assertTrue(batchIterator.hasNext());
+                    RecordBatch batch = batchIterator.next();
+                    assertFalse(batchIterator.hasNext());
+                    assertEquals(0, batch.baseSequence());
+                    assertEquals(producerId, batch.producerId());
+                    assertEquals(0, batch.producerEpoch());
+                    return true;
+                }
+                return false;
+            }
+        }, produceResponse(tp0, 0, Errors.NONE, 0));
+
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+
+        sender.run(time.milliseconds());  // receive response
+        assertTrue(responseFuture.isDone());
+        assertEquals((long) transactionState.sequenceNumber(tp0), 1L);
+    }
+
+    @Test
+    public void testAbortRetryWhenPidChanges() throws InterruptedException {
+        final long producerId = 343434L;
+        TransactionState transactionState = new TransactionState(new MockTime());
+        transactionState.setPidAndEpoch(producerId, (short) 0);
+        setupWithTransactionState(transactionState);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        Sender sender = new Sender(client,
+                metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                maxRetries,
+                m,
+                time,
+                REQUEST_TIMEOUT,
+                50,
+                transactionState,
+                apiVersions
+        );
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+        String id = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(id), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertTrue("Client ready status should be true", client.isReady(node, 0L));
+        client.disconnect(id);
+        assertEquals(0, client.inFlightRequestCount());
+        assertFalse("Client ready status should be false", client.isReady(node, 0L));
+
+        transactionState.setPidAndEpoch(producerId + 1, (short) 0);
+        sender.run(time.milliseconds()); // receive error
+        sender.run(time.milliseconds()); // reconnect
+        sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
+        assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
+
+        KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, ""));
+        assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
+
+        assertTrue(responseFuture.isDone());
+        assertEquals((long) transactionState.sequenceNumber(tp0), 0L);
+    }
+
+    @Test
+    public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
+        final long producerId = 343434L;
+        TransactionState transactionState = new TransactionState(new MockTime());
+        transactionState.setPidAndEpoch(producerId, (short) 0);
+        setupWithTransactionState(transactionState);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        Sender sender = new Sender(client,
+                metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                maxRetries,
+                m,
+                time,
+                REQUEST_TIMEOUT,
+                50,
+                transactionState,
+                apiVersions
+        );
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
+
+        sender.run(time.milliseconds());
+        assertTrue(responseFuture.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionState.hasPid());
+    }
+
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
         assertTrue("Request should be completed", future.isDone());
         try {
@@ -397,4 +548,25 @@ public class SenderTest {
         return new ProduceResponse(partResp, throttleTimeMs);
     }
 
+    private void setupWithTransactionState(TransactionState transactionState) {
+        Map<String, String> metricTags = new LinkedHashMap<>();
+        metricTags.put("client-id", CLIENT_ID);
+        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+        this.metrics = new Metrics(metricConfig, time);
+        this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionState);
+        this.sender = new Sender(this.client,
+                this.metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                MAX_RETRIES,
+                this.metrics,
+                this.time,
+                REQUEST_TIMEOUT,
+                50,
+                transactionState,
+                apiVersions);
+        this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
new file mode 100644
index 0000000..a8a1716
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+
+import org.apache.kafka.clients.producer.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TransactionStateTest {
+
+    private TopicPartition topicPartition;
+
+    @Before
+    public void setUp() {
+        topicPartition = new TopicPartition("topic-0", 0);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInvalidSequenceIncrement() {
+        TransactionState transactionState = new TransactionState(new MockTime());
+        transactionState.incrementSequenceNumber(topicPartition, 3333);
+    }
+
+    @Test
+    public void testDefaultSequenceNumber() {
+        TransactionState transactionState = new TransactionState(new MockTime());
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+        transactionState.incrementSequenceNumber(topicPartition, 3);
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 3);
+    }
+
+
+    @Test
+    public void testProducerIdReset() {
+        TransactionState transactionState = new TransactionState(new MockTime());
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+        transactionState.incrementSequenceNumber(topicPartition, 3);
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 3);
+        transactionState.resetProducerId();
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 2dd5ab0..a2c761f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -355,7 +355,7 @@ public class MemoryRecordsTest {
 
     private static class RetainNonNullKeysFilter implements MemoryRecords.RecordFilter {
         @Override
-        public boolean shouldRetain(Record record) {
+        public boolean shouldRetain(RecordBatch batch, Record record) {
             return record.hasKey();
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2024f90..8a7633e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -136,6 +136,9 @@ public class RequestResponseTest {
         checkRequest(createDeleteTopicsRequest());
         checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException());
         checkResponse(createDeleteTopicsResponse(), 0);
+        checkRequest(createInitPidRequest());
+        checkErrorResponse(createInitPidRequest(), new UnknownServerException());
+        checkResponse(createInitPidResponse(), 0);
         checkOlderFetchVersions();
         checkResponse(createMetadataResponse(), 0);
         checkResponse(createMetadataResponse(), 1);
@@ -787,6 +790,14 @@ public class RequestResponseTest {
         return new DeleteTopicsResponse(errors);
     }
 
+    private InitPidRequest createInitPidRequest() {
+        return new InitPidRequest.Builder(null).build();
+    }
+
+    private InitPidResponse createInitPidResponse() {
+        return new InitPidResponse(Errors.NONE, 3332, (short) 3);
+    }
+
     private static class ByteBufferChannel implements GatheringByteChannel {
         private final ByteBuffer buf;
         private boolean closed = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
index 0a082fb..ce23a33 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
@@ -65,6 +65,16 @@ public class ByteUtilsTest {
     }
 
     @Test
+    public void testReadUnsignedInt() {
+        ByteBuffer buffer = ByteBuffer.allocate(4);
+        long writeValue = 133444;
+        ByteUtils.writeUnsignedInt(buffer, writeValue);
+        buffer.flip();
+        long readValue = ByteUtils.readUnsignedInt(buffer);
+        assertEquals(writeValue, readValue);
+    }
+
+    @Test
     public void testWriteUnsignedIntLEToArray() {
         int value1 = 0x04030201;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index a2308b2..194cfcc 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -24,9 +24,9 @@ import kafka.cluster.Broker
 import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients._
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
@@ -180,7 +180,6 @@ class RequestSendThread(val controllerId: Int,
     def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))
 
     val QueueItem(apiKey, requestBuilder, callback) = queue.take()
-    import NetworkClientBlockingOps._
     var clientResponse: ClientResponse = null
     try {
       lock synchronized {
@@ -196,7 +195,7 @@ class RequestSendThread(val controllerId: Int,
             else {
               val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
                 time.milliseconds(), true)
-              clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
+              clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
               isSendSuccessful = true
             }
           } catch {
@@ -233,10 +232,9 @@ class RequestSendThread(val controllerId: Int,
   }
 
   private def brokerReady(): Boolean = {
-    import NetworkClientBlockingOps._
     try {
-      if (!networkClient.isReady(brokerNode)(time)) {
-        if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time))
+      if (!NetworkClientUtils.isReady(networkClient, brokerNode, time.milliseconds())) {
+        if (!NetworkClientUtils.awaitReady(networkClient, brokerNode, time, socketTimeoutMs))
           throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
         info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString))

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 9d62924..2bc0c21 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -69,7 +69,7 @@ class GroupMetadataManager(val brokerId: Int,
   private val shuttingDown = new AtomicBoolean(false)
 
   /* number of partitions for the consumer metadata topic */
-  private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
+  private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
 
   /* single-thread scheduler to handle offset/group metadata cache loading and unloading */
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
@@ -667,16 +667,11 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   /**
-   * Gets the partition count of the offsets topic from ZooKeeper.
+   * Gets the partition count of the group metadata topic from ZooKeeper.
    * If the topic does not exist, the configured partition count is returned.
    */
-  private def getOffsetsTopicPartitionCount = {
-    val topic = Topic.GroupMetadataTopicName
-    val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
-    if (topicData(topic).nonEmpty)
-      topicData(topic).size
-    else
-      config.offsetsTopicNumPartitions
+  private def getGroupMetadataTopicPartitionCount: Int = {
+    zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName).getOrElse(config.offsetsTopicNumPartitions)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/PidMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/PidMetadata.scala b/core/src/main/scala/kafka/coordinator/PidMetadata.scala
new file mode 100644
index 0000000..fa58add
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/PidMetadata.scala
@@ -0,0 +1,31 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.coordinator
+
+import kafka.utils.nonthreadsafe
+
+@nonthreadsafe
+private[coordinator] class PidMetadata(val pid: Long) {
+
+  /* current epoch number of the PID */
+  var epoch: Short = 0
+
+  override def equals(that: Any): Boolean = that match {
+    case other: PidMetadata => pid == other.pid && epoch == other.epoch
+    case _ => false
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
new file mode 100644
index 0000000..43b05a4
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
@@ -0,0 +1,153 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.coordinator
+
+import kafka.common.KafkaException
+import kafka.utils.{Json, Logging, ZkUtils}
+
+/**
+  * Pid manager is part of the transaction coordinator that provides PIDs in a unique way such that the same PID will not be
+  * assigned twice across multiple transaction coordinators.
+  *
+  * Pids are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who
+  * claims the block, where the written block_start_pid and block_end_pid are both inclusive.
+  */
+object ProducerIdManager extends Logging {
+  val CurrentVersion: Long = 1L
+  val PidBlockSize: Long = 1000L
+
+  def generatePidBlockJson(pidBlock: ProducerIdBlock): String = {
+    Json.encode(Map("version" -> CurrentVersion,
+      "broker" -> pidBlock.brokerId,
+      "block_start" -> pidBlock.blockStartPid.toString,
+      "block_end" -> pidBlock.blockEndPid.toString)
+    )
+  }
+
+  def parsePidBlockData(jsonData: String): ProducerIdBlock = {
+    try {
+      Json.parseFull(jsonData).flatMap { m =>
+        val pidBlockInfo = m.asInstanceOf[Map[String, Any]]
+        val brokerId = pidBlockInfo("broker").asInstanceOf[Int]
+        val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong
+        val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong
+        Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID))
+      }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData"))
+    } catch {
+      case e: java.lang.NumberFormatException =>
+        // this should never happen: the written data has exceeded long type limit
+        fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit")
+        throw e
+    }
+  }
+}
+
+case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) {
+  override def toString: String = {
+    val pidBlockInfo = new StringBuilder
+    pidBlockInfo.append("(brokerId:" + brokerId)
+    pidBlockInfo.append(",blockStartPID:" + blockStartPid)
+    pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")")
+    pidBlockInfo.toString()
+  }
+}
+
+class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging {
+
+  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+  private var currentPIDBlock: ProducerIdBlock = null
+  private var nextPID: Long = -1L
+
+  // grab the first block of PIDs
+  this synchronized {
+    getNewPidBlock()
+    nextPID = currentPIDBlock.blockStartPid
+  }
+
+  private def getNewPidBlock(): Unit = {
+    var zkWriteComplete = false
+    while (!zkWriteComplete) {
+      // refresh current pid block from zookeeper again
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+
+      // generate the new pid block
+      currentPIDBlock = dataOpt match {
+        case Some(data) =>
+          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+          debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion")
+
+          if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+            // we have exhausted all pids (wow!), treat it as a fatal error
+            fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})")
+            throw new KafkaException("Have exhausted all pids.")
+          }
+
+          ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize)
+        case None =>
+          debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block")
+          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+      }
+
+      val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock)
+
+      // try to write the new pid block into zookeeper
+      val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData))
+      zkWriteComplete = succeeded
+
+      if (zkWriteComplete)
+        info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version")
+    }
+  }
+
+  private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+    try {
+      val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData)
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+      dataOpt match {
+        case Some(data) =>
+          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+          (currPIDBlock.equals(expectedPidBlock), zkVersion)
+        case None =>
+          (false, -1)
+      }
+    } catch {
+      case e: Exception =>
+        warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e)
+        
+        (false, -1)
+    }
+  }
+
+  def nextPid(): Long = {
+    this synchronized {
+      // grab a new block of PIDs if this block has been exhausted
+      if (nextPID > currentPIDBlock.blockEndPid) {
+        getNewPidBlock()
+        nextPID = currentPIDBlock.blockStartPid + 1
+      } else {
+        nextPID += 1
+      }
+
+      nextPID - 1
+    }
+  }
+
+  def shutdown() {
+    info(s"Shutdown complete: last PID assigned $nextPID")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
new file mode 100644
index 0000000..41b4323
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
@@ -0,0 +1,92 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.coordinator
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, ZkUtils}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.utils.Time
+
+/**
+  * Transaction coordinator handles message transactions sent by producers and communicate with brokers
+  * to update ongoing transaction's status.
+  *
+  * Each Kafka server instantiates a transaction coordinator which is responsible for a set of
+  * producers. Producers with specific transactional ids are assigned to their corresponding coordinators;
+  * Producers with no specific transactional id may talk to a random broker as their coordinators.
+  */
+object TransactionCoordinator {
+
+  def apply(config: KafkaConfig, zkUtils: ZkUtils, time: Time): TransactionCoordinator = {
+    val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
+    new TransactionCoordinator(config.brokerId, pidManager)
+  }
+}
+
+class TransactionCoordinator(val brokerId: Int,
+                             val pidManager: ProducerIdManager) extends Logging {
+
+  this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
+
+  type InitPidCallback = InitPidResult => Unit
+
+  /* Active flag of the coordinator */
+  private val isActive = new AtomicBoolean(false)
+
+  def handleInitPid(transactionalId: String,
+                    responseCallback: InitPidCallback): Unit = {
+    if (transactionalId == null || transactionalId.isEmpty) {
+      // if the transactional id is not specified, then always blindly accept the request
+      // and return a new pid from the pid manager
+      val pid = pidManager.nextPid()
+      responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
+    } else {
+      // check if it is the assigned coordinator for the transactional id
+      responseCallback(initPidError(Errors.NOT_COORDINATOR_FOR_GROUP))
+    }
+  }
+
+  /**
+    * Startup logic executed at the same time when the server starts up.
+    */
+  def startup() {
+    info("Starting up.")
+    isActive.set(true)
+    info("Startup complete.")
+  }
+
+  /**
+    * Shutdown logic executed at the same time when server shuts down.
+    * Ordering of actions should be reversed from the startup process.
+    */
+  def shutdown() {
+    info("Shutting down.")
+    isActive.set(false)
+    pidManager.shutdown()
+    info("Shutdown complete.")
+  }
+
+  private def initPidError(error: Errors): InitPidResult = {
+    InitPidResult(pid = RecordBatch.NO_PRODUCER_ID, epoch = RecordBatch.NO_PRODUCER_EPOCH, error)
+  }
+
+}
+
+case class InitPidResult(pid: Long, epoch: Short, error: Errors)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 2a81f26..95a6896 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,30 +17,30 @@
 
 package kafka.log
 
+import java.io.{File, IOException}
+import java.text.NumberFormat
+import java.util.concurrent.atomic._
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
+
+import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_10_0_IV0
-import kafka.utils._
 import kafka.common._
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
-import java.io.{File, IOException}
-import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
-import java.util.concurrent.atomic._
-import java.text.NumberFormat
-
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ListOffsetRequest
+import org.apache.kafka.common.utils.{Time, Utils}
 
-import scala.collection.Seq
 import scala.collection.JavaConverters._
-import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.utils.{Time, Utils}
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
-import org.apache.kafka.common.TopicPartition
+import scala.collection.{Seq, mutable}
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
-    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, Map.empty[Long, ProducerAppendInfo], false)
 }
 
 /**
@@ -56,6 +56,9 @@ object LogAppendInfo {
  * @param shallowCount The number of shallow messages
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
+ * @param producerAppendInfos A map from a Pid to a ProducerAppendInfo, which is used to validate each Record in a
+ *                            RecordBatch and keep track of metadata across Records in a RecordBatch.
+ * @param isDuplicate Indicates whether the message set is a duplicate of a message at the tail of the log.
  */
 case class LogAppendInfo(var firstOffset: Long,
                          var lastOffset: Long,
@@ -66,8 +69,9 @@ case class LogAppendInfo(var firstOffset: Long,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
                          validBytes: Int,
-                         offsetsMonotonic: Boolean)
-
+                         offsetsMonotonic: Boolean,
+                         producerAppendInfos: Map[Long, ProducerAppendInfo],
+                         isDuplicate: Boolean = false)
 
 /**
  * An append-only log for storing messages.
@@ -93,7 +97,8 @@ case class LogAppendInfo(var firstOffset: Long,
  * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
  * @param scheduler The thread pool scheduler used for background actions
  * @param time The time instance used for checking the clock
- *
+ * @param maxPidExpirationMs The maximum amount of time to wait before a PID is considered expired
+ * @param pidExpirationCheckIntervalMs How often to check for PIDs which need to be expired
  */
 @threadsafe
 class Log(@volatile var dir: File,
@@ -101,7 +106,10 @@ class Log(@volatile var dir: File,
           @volatile var logStartOffset: Long = 0L,
           @volatile var recoveryPoint: Long = 0L,
           scheduler: Scheduler,
-          time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
+          time: Time = Time.SYSTEM,
+          val maxPidExpirationMs: Int = 60 * 60 * 1000,
+          val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
+          val pidSnapshotCreationIntervalMs: Int = 60 * 1000) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
 
@@ -118,10 +126,16 @@ class Log(@volatile var dir: File,
       0
   }
 
+  val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
+
   @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
 
+  /* Construct and load PID map */
+  private val pidMap = new ProducerIdMapping(config, topicPartition, dir, maxPidExpirationMs)
+
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
+
   locally {
     val startMs = time.milliseconds
 
@@ -131,13 +145,12 @@ class Log(@volatile var dir: File,
       activeSegment.size.toInt)
 
     logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+    buildAndRecoverPidMap(logEndOffset)
 
     info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
       .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
   }
 
-  val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
-
   private val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
 
   newGauge("NumLogSegments",
@@ -164,6 +177,19 @@ class Log(@volatile var dir: File,
     },
     tags)
 
+  scheduler.schedule(name = "PeriodicPidExpirationCheck", fun = () => {
+    lock synchronized {
+      pidMap.checkForExpiredPids(time.milliseconds)
+    }
+  }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
+
+  scheduler.schedule(name = "PeriodicPidSnapshotTask", fun = () => {
+    lock synchronized {
+      pidMap.maybeTakeSnapshot()
+    }
+  }, period = pidSnapshotCreationIntervalMs, unit = TimeUnit.MILLISECONDS)
+
+
   /** The name of this log */
   def name  = dir.getName()
 
@@ -332,6 +358,47 @@ class Log(@volatile var dir: File,
   }
 
   /**
+    * Creates an instance of id map for this log and updates the mapping
+    * in the case it is missing some messages. Note that the id mapping
+    * starts from a snapshot that is taken strictly before the log end
+    * offset. Consequently, we need to process the tail of the log to update
+    * the mapping.
+    *
+    * @param lastOffset
+    *
+    * @return An instance of ProducerIdMapping
+    */
+  private def buildAndRecoverPidMap(lastOffset: Long) {
+    lock synchronized {
+      val currentTimeMs = time.milliseconds
+      pidMap.truncateAndReload(lastOffset, currentTimeMs)
+      logSegments(pidMap.mapEndOffset, lastOffset).foreach { segment =>
+        val startOffset = math.max(segment.baseOffset, pidMap.mapEndOffset)
+        val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
+        val records = fetchDataInfo.records
+        records.batches.asScala.foreach { batch =>
+          if (batch.hasProducerId) {
+            // TODO: Currently accessing any of the batch-level headers other than the offset
+            // or magic causes us to load the full entry into memory. It would be better if we
+            // only loaded the header
+            val numRecords = (batch.lastOffset - batch.baseOffset + 1).toInt
+            val pidEntry = ProducerIdEntry(batch.producerEpoch, batch.lastSequence, batch.lastOffset,
+              numRecords, batch.maxTimestamp)
+            pidMap.load(batch.producerId, pidEntry, currentTimeMs)
+          }
+        }
+      }
+      pidMap.cleanFrom(logStartOffset)
+    }
+  }
+
+  private[log] def activePids: Map[Long, ProducerIdEntry] = {
+    lock synchronized {
+      pidMap.activePids
+    }
+  }
+
+  /**
    * Check if we have the "clean shutdown" file
    */
   private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists()
@@ -364,10 +431,11 @@ class Log(@volatile var dir: File,
    * @return Information about the appended messages including the first and last offset.
    */
   def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
-    val appendInfo = analyzeAndValidateRecords(records)
 
-    // if we have any valid messages, append them to the log
-    if (appendInfo.shallowCount == 0)
+    val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets)
+
+    // return if we have no valid messages or if this is a duplicate of the last appended entry
+    if (appendInfo.shallowCount == 0 || appendInfo.isDuplicate)
       return appendInfo
 
     // trim any invalid bytes or partial messages before appending it to the on-disk log
@@ -433,7 +501,6 @@ class Log(@volatile var dir: File,
           maxTimestampInMessages = appendInfo.maxTimestamp,
           maxOffsetInMessages = appendInfo.lastOffset)
 
-
         // now append to the log
         segment.append(firstOffset = appendInfo.firstOffset,
           largestOffset = appendInfo.lastOffset,
@@ -441,6 +508,15 @@ class Log(@volatile var dir: File,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
           records = validRecords)
 
+        // update the PID sequence mapping
+        for ((pid, producerAppendInfo) <- appendInfo.producerAppendInfos) {
+          trace(s"Updating pid with sequence: $pid -> ${producerAppendInfo.lastEntry}")
+
+          if (assignOffsets)
+            producerAppendInfo.assignLastOffsetAndTimestamp(appendInfo.lastOffset, appendInfo.maxTimestamp)
+          pidMap.update(producerAppendInfo)
+        }
+
         // increment the log end offset
         updateLogEndOffset(appendInfo.lastOffset + 1)
 
@@ -457,7 +533,7 @@ class Log(@volatile var dir: File,
     }
   }
 
-  /*
+  /**
    * Increment the log start offset if the provided offset is larger.
    */
   def maybeIncrementLogStartOffset(offset: Long) {
@@ -476,6 +552,7 @@ class Log(@volatile var dir: File,
    * <ol>
    * <li> each message matches its CRC
    * <li> each message size is valid
+   * <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other.
    * </ol>
    *
    * Also compute the following quantities:
@@ -488,7 +565,7 @@ class Log(@volatile var dir: File,
    * <li> Whether any compression codec is used (if many are used, then the last one is given)
    * </ol>
    */
-  private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = {
+  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
     var firstOffset = -1L
@@ -497,8 +574,12 @@ class Log(@volatile var dir: File,
     var monotonic = true
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
+    var isDuplicate = false
+    val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()
 
     for (batch <- records.batches.asScala) {
+      if (isFromClient && batch.magic >= RecordBatch.MAGIC_VALUE_V2 && shallowMessageCount > 0)
+        throw new InvalidRecordException("Client produce requests should not have more than one batch")
       // update the first offset if on the first message. For magic versions older than 2, we use the last offset
       // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
       // For magic version 2, we can get the first offset directly from the batch header.
@@ -508,6 +589,7 @@ class Log(@volatile var dir: File,
       // check that offsets are monotonically increasing
       if (lastOffset >= batch.lastOffset)
         monotonic = false
+
       // update the last offset seen
       lastOffset = batch.lastOffset
 
@@ -527,19 +609,43 @@ class Log(@volatile var dir: File,
         maxTimestamp = batch.maxTimestamp
         offsetOfMaxTimestamp = lastOffset
       }
+
       shallowMessageCount += 1
       validBytesCount += batchSize
 
       val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
       if (messageCodec != NoCompressionCodec)
         sourceCodec = messageCodec
+
+      val pid = batch.producerId
+      if (pid != RecordBatch.NO_PRODUCER_ID) {
+        producerAppendInfos.get(pid) match {
+          case Some(appendInfo) => appendInfo.append(batch)
+          case None =>
+            val lastEntry = pidMap.lastEntry(pid).getOrElse(ProducerIdEntry.Empty)
+            if (isFromClient && lastEntry.isDuplicate(batch)) {
+              // This request is a duplicate so return the information about the existing entry. Note that for requests
+              // coming from the client, there will only be one RecordBatch per request, so there will be only one iteration
+              // of the loop and the values below will not be updated more than once.
+              isDuplicate = true
+              firstOffset = lastEntry.firstOffset
+              lastOffset = lastEntry.lastOffset
+              maxTimestamp = lastEntry.timestamp
+              info(s"Detected a duplicate at (firstOffset, lastOffset): (${firstOffset}, ${lastOffset}). Ignoring the incoming record.")
+            } else {
+              val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry)
+              producerAppendInfos.put(pid, producerAppendInfo)
+              producerAppendInfo.append(batch)
+            }
+        }
+      }
     }
 
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
 
     LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec,
-      targetCodec, shallowMessageCount, validBytesCount, monotonic)
+      targetCodec, shallowMessageCount, validBytesCount, monotonic, producerAppendInfos.toMap, isDuplicate)
   }
 
   /**
@@ -941,6 +1047,7 @@ class Log(@volatile var dir: File,
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
         this.logStartOffset = math.min(targetOffset, this.logStartOffset)
       }
+      buildAndRecoverPidMap(targetOffset)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8ddeca9..830f906 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -26,7 +26,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, RecordBatch}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@@ -219,8 +219,7 @@ class LogCleaner(val config: CleanerConfig,
     override def doWork() {
       cleanOrSleep()
     }
-    
-    
+
     override def shutdown() = {
     	 initiateShutdown()
     	 backOffWaitLatch.countDown()
@@ -402,7 +401,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.lastModified > deleteHorizonMs
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
             .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
+        cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, log.activePids, stats)
       }
 
       // trim excess index
@@ -449,9 +448,10 @@ private[log] class Cleaner(val id: Int,
                              map: OffsetMap,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
+                             activePids: Map[Long, ProducerIdEntry],
                              stats: CleanerStats) {
     val logCleanerFilter = new RecordFilter {
-      def shouldRetain(record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats)
+      def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats, activePids, recordBatch.producerId)
     }
 
     var position = 0
@@ -493,10 +493,17 @@ private[log] class Cleaner(val id: Int,
                                   map: kafka.log.OffsetMap,
                                   retainDeletes: Boolean,
                                   record: Record,
-                                  stats: CleanerStats): Boolean = {
+                                  stats: CleanerStats,
+                                  activePids: Map[Long, ProducerIdEntry],
+                                  pid: Long): Boolean = {
     if (record.isControlRecord)
       return true
 
+    // retain the entry if it is the last one produced by an active idempotent producer to ensure that
+    // the PID is not removed from the log before it has been expired
+    if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset))
+      return true
+
     val pastLatestOffset = record.offset > map.latestOffset
     if (pastLatestOffset)
       return true

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 55669c0..30bc26b 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -56,6 +56,7 @@ object Defaults {
   val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs
   val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
   val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
+  val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a555420..ec164e2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -25,9 +25,10 @@ import kafka.utils._
 import scala.collection._
 import scala.collection.JavaConverters._
 import kafka.common.{KafkaException, KafkaStorageException}
-import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
+import kafka.server._
 import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
 
+import kafka.admin.AdminUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 
@@ -51,6 +52,7 @@ class LogManager(val logDirs: Array[File],
                  val flushRecoveryOffsetCheckpointMs: Long,
                  val flushStartOffsetCheckpointMs: Long,
                  val retentionCheckMs: Long,
+                 val maxPidExpirationMs: Int,
                  scheduler: Scheduler,
                  val brokerState: BrokerState,
                  time: Time) extends Logging {
@@ -166,7 +168,14 @@ class LogManager(val logDirs: Array[File],
           val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
           val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
 
-          val current = new Log(logDir, config, logStartOffset, logRecoveryPoint, scheduler, time)
+          val current = new Log(
+            dir = logDir,
+            config = config,
+            logStartOffset = logStartOffset,
+            recoveryPoint = logRecoveryPoint,
+            maxPidExpirationMs = maxPidExpirationMs,
+            scheduler = scheduler,
+            time = time)
           if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
             this.logsToBeDeleted.add(current)
           } else {
@@ -401,7 +410,15 @@ class LogManager(val logDirs: Array[File],
         val dataDir = nextLogDir()
         val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
         dir.mkdirs()
-        val log = new Log(dir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler, time)
+
+        val log = new Log(
+          dir = dir,
+          config = config,
+          logStartOffset = 0L,
+          recoveryPoint = 0L,
+          maxPidExpirationMs = maxPidExpirationMs,
+          scheduler = scheduler,
+          time = time)
         logs.put(topicPartition, log)
         info("Created log for partition [%s,%d] in %s with properties {%s}."
           .format(topicPartition.topic,
@@ -493,7 +510,7 @@ class LogManager(val logDirs: Array[File],
       // count the number of logs in each parent directory (including 0 for empty directories
       val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
       val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
-      var dirCounts = (zeros ++ logCounts).toBuffer
+      val dirCounts = (zeros ++ logCounts).toBuffer
     
       // choose the directory with the least logs in it
       val leastLoaded = dirCounts.sortBy(_._2).head
@@ -556,3 +573,42 @@ class LogManager(val logDirs: Array[File],
     }
   }
 }
+
+object LogManager {
+  def apply(config: KafkaConfig,
+            zkUtils: ZkUtils,
+            brokerState: BrokerState,
+            kafkaScheduler: KafkaScheduler,
+            time: Time): LogManager = {
+    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
+    val defaultLogConfig = LogConfig(defaultProps)
+
+    val topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
+      topic -> LogConfig.fromProps(defaultProps, configs)
+    }
+
+    // read the log configurations from zookeeper
+    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
+      dedupeBufferSize = config.logCleanerDedupeBufferSize,
+      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
+      ioBufferSize = config.logCleanerIoBufferSize,
+      maxMessageSize = config.messageMaxBytes,
+      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
+      backOffMs = config.logCleanerBackoffMs,
+      enableCleaner = config.logCleanerEnable)
+
+    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
+      topicConfigs = topicConfigs,
+      defaultConfig = defaultLogConfig,
+      cleanerConfig = cleanerConfig,
+      ioThreads = config.numRecoveryThreadsPerDataDir,
+      flushCheckMs = config.logFlushSchedulerIntervalMs,
+      flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+      flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
+      retentionCheckMs = config.logCleanupIntervalMs,
+      maxPidExpirationMs = config.transactionIdExpirationMs,
+      scheduler = kafkaScheduler,
+      brokerState = brokerState,
+      time = time)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 94e3608..c01a5de 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -62,6 +62,7 @@ private[kafka] object LogValidator extends Logging {
         assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
           messageTimestampDiffMaxMs)
     } else {
+
       validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
         messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs)
     }
@@ -214,8 +215,15 @@ private[kafka] object LogValidator extends Logging {
       }
 
       if (!inPlaceAssignment) {
+        val (pid, epoch, sequence) = {
+          // note that we only reassign offsets for requests coming straight from a producer. For records with MagicV2,
+          // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
+          // with older magic versions, this will always be NO_PRODUCER_ID, etc.
+          val first = records.batches.asScala.head
+          (first.producerId, first.producerEpoch, first.baseSequence)
+        }
         buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType,
-          CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords)
+          CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence)
       } else {
         // we can update the batch only and write the compressed payload as is
         val batch = records.batches.iterator.next()
@@ -238,10 +246,12 @@ private[kafka] object LogValidator extends Logging {
 
   private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType,
                                            compressionType: CompressionType, logAppendTime: Long,
-                                           validatedRecords: Seq[Record]): ValidationAndOffsetAssignResult = {
+                                           validatedRecords: Seq[Record],
+                                           producerId: Long, epoch: Short, baseSequence: Int): ValidationAndOffsetAssignResult = {
     val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava)
     val buffer = ByteBuffer.allocate(estimatedSize)
-    val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value, logAppendTime)
+    val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
+      logAppendTime, producerId, epoch, baseSequence)
 
     validatedRecords.foreach { record =>
       builder.appendWithOffset(offsetCounter.getAndIncrement(), record)


[4/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics

Posted by ju...@apache.org.
KAFKA-4817; Add idempotent producer semantics

This is from the KIP-98 proposal.

The main points of discussion surround the correctness logic, particularly the Log class where incoming entries are validated and duplicates are dropped, and also the producer error handling to ensure that the semantics are sound from the users point of view.

There is some subtlety in the idempotent producer semantics. This patch only guarantees idempotent production upto the point where an error has to be returned to the user. Once we hit a such a non-recoverable error, we can no longer guarantee message ordering nor idempotence without additional logic at the application level.

In particular, if an application wants guaranteed message order without duplicates, then it needs to do the following in the error callback:

1. Close the producer so that no queued batches are sent. This is important for guaranteeing ordering.
2. Read the tail of the log to inspect the last message committed. This is important for avoiding duplicates.

Author: Apurva Mehta <ap...@confluent.io>
Author: hachikuji <ja...@confluent.io>
Author: Apurva Mehta <ap...@gmail.com>
Author: Guozhang Wang <wa...@gmail.com>
Author: fpj <fp...@apache.org>
Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2735 from apurvam/exactly-once-idempotent-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bdf4cba0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bdf4cba0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bdf4cba0

Branch: refs/heads/trunk
Commit: bdf4cba047334943aa8357585c4fb379b27e9ffd
Parents: 1ce6aa5
Author: Apurva Mehta <ap...@confluent.io>
Authored: Sun Apr 2 19:41:44 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Apr 2 19:41:44 2017 -0700

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |   4 +-
 checkstyle/suppressions.xml                     |   3 +
 .../kafka/clients/NetworkClientUtils.java       | 103 +++++
 .../kafka/clients/producer/KafkaProducer.java   | 184 ++++++---
 .../kafka/clients/producer/MockProducer.java    |   1 -
 .../kafka/clients/producer/ProducerConfig.java  |  20 +-
 .../clients/producer/TransactionState.java      | 135 +++++++
 .../producer/internals/ProducerBatch.java       |  19 +-
 .../producer/internals/RecordAccumulator.java   |  53 ++-
 .../clients/producer/internals/Sender.java      | 180 +++++++--
 .../DuplicateSequenceNumberException.java       |  24 ++
 .../errors/OutOfOrderSequenceException.java     |  24 ++
 .../common/errors/ProducerFencedException.java  |  24 ++
 .../apache/kafka/common/protocol/ApiKeys.java   |   3 +-
 .../apache/kafka/common/protocol/Errors.java    |  12 +-
 .../apache/kafka/common/protocol/Protocol.java  |  25 ++
 .../kafka/common/protocol/types/Struct.java     |   4 +
 .../kafka/common/protocol/types/Type.java       |  30 ++
 .../record/AbstractLegacyRecordBatch.java       |   5 +
 .../common/record/AbstractRecordBatch.java      |   5 +
 .../kafka/common/record/MemoryRecords.java      |  20 +-
 .../common/record/MemoryRecordsBuilder.java     |  26 +-
 .../apache/kafka/common/record/RecordBatch.java |   5 +
 .../kafka/common/requests/AbstractRequest.java  |   3 +
 .../kafka/common/requests/AbstractResponse.java |   2 +
 .../kafka/common/requests/InitPidRequest.java   |  81 ++++
 .../kafka/common/requests/InitPidResponse.java  |  80 ++++
 .../apache/kafka/common/utils/ByteUtils.java    |  10 +
 .../internals/RecordAccumulatorTest.java        |  44 ++-
 .../clients/producer/internals/SenderTest.java  | 250 ++++++++++--
 .../internals/TransactionStateTest.java         |  61 +++
 .../kafka/common/record/MemoryRecordsTest.java  |   2 +-
 .../common/requests/RequestResponseTest.java    |  11 +
 .../kafka/common/utils/ByteUtilsTest.java       |  10 +
 .../controller/ControllerChannelManager.scala   |  12 +-
 .../coordinator/GroupMetadataManager.scala      |  13 +-
 .../scala/kafka/coordinator/PidMetadata.scala   |  31 ++
 .../kafka/coordinator/ProducerIdManager.scala   | 153 +++++++
 .../coordinator/TransactionCoordinator.scala    |  92 +++++
 core/src/main/scala/kafka/log/Log.scala         | 157 ++++++--
 core/src/main/scala/kafka/log/LogCleaner.scala  |  19 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |   1 +
 core/src/main/scala/kafka/log/LogManager.scala  |  64 ++-
 .../src/main/scala/kafka/log/LogValidator.scala |  16 +-
 .../scala/kafka/log/ProducerIdMapping.scala     | 394 +++++++++++++++++++
 .../src/main/scala/kafka/server/KafkaApis.scala |  52 ++-
 .../main/scala/kafka/server/KafkaConfig.scala   |  21 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  54 +--
 .../kafka/server/ReplicaFetcherThread.scala     |   7 +-
 .../kafka/utils/NetworkClientBlockingOps.scala  | 145 -------
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  36 +-
 .../kafka/api/ProducerBounceTest.scala          |  85 ++--
 .../GroupCoordinatorResponseTest.scala          |   7 +-
 .../coordinator/GroupMetadataManagerTest.scala  |   5 +-
 .../coordinator/ProducerIdManagerTest.scala     | 105 +++++
 .../TransactionCoordinatorTest.scala            |  93 +++++
 .../unit/kafka/log/BrokerCompressionTest.scala  |   2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   2 +-
 .../log/LogCleanerLagIntegrationTest.scala      |   2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |   4 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  29 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   4 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 323 ++++++++++++---
 .../unit/kafka/log/ProducerIdMappingTest.scala  | 224 +++++++++++
 .../kafka/server/ReplicationQuotasTest.scala    |   1 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 .../scala/unit/kafka/utils/ZkUtilsTest.scala    |  15 +
 tests/kafkatest/services/verifiable_producer.py |  16 +-
 tests/kafkatest/tests/core/replication_test.py  |  24 +-
 .../kafkatest/tests/produce_consume_validate.py |  13 +-
 70 files changed, 3124 insertions(+), 563 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index b775025..6a263cc 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -98,7 +98,7 @@
     <module name="MethodLength"/>
     <module name="ParameterNumber">
       <!-- default is 8 -->
-      <property name="max" value="12"/>
+      <property name="max" value="13"/>
     </module>
     <module name="ClassDataAbstractionCoupling">
       <!-- default is 7 -->
@@ -115,7 +115,7 @@
     </module>
     <module name="CyclomaticComplexity">
       <!-- default is 10-->
-      <property name="max" value="15"/>
+      <property name="max" value="16"/>
     </module>
     <module name="JavaNCSS">
       <!-- default is 50 -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ea1619e..7bc55c8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -88,6 +88,9 @@
     <suppress checks="ClassFanOutComplexity"
               files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/>
 
+    <suppress checks="JavaNCSS"
+              files="RequestResponseTest.java"/>
+
     <!-- Connect -->
     <suppress checks="ClassFanOutComplexity"
               files="DistributedHerder.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
new file mode 100644
index 0000000..8462979
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Provides additional utilities for {@link NetworkClient} (e.g. to implement blocking behaviour).
+ */
+public class NetworkClientUtils {
+
+    /**
+     * Checks whether the node is currently connected, first calling `client.poll` to ensure that any pending
+     * disconnects have been processed.
+     *
+     * This method can be used to check the status of a connection prior to calling the blocking version to be able
+     * to tell whether the latter completed a new connection.
+     */
+    public static boolean isReady(KafkaClient client, Node node, long currentTime) {
+        client.poll(0, currentTime);
+        return client.isReady(node, currentTime);
+    }
+
+    /**
+     * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
+     * invocations until the connection to `node` is ready, the timeoutMs expires or the connection fails.
+     *
+     * It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails,
+     * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive
+     * connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which
+     * has recently disconnected.
+     *
+     * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
+     * care.
+     */
+    public static boolean awaitReady(KafkaClient client, Node node, Time time, long timeoutMs) throws IOException {
+        if (timeoutMs < 0) {
+            throw new IllegalArgumentException("Timeout needs to be greater than 0");
+        }
+        long startTime = time.milliseconds();
+        long expiryTime = startTime + timeoutMs;
+
+        if (isReady(client, node, startTime) ||  client.ready(node, startTime))
+            return true;
+
+        long attemptStartTime = time.milliseconds();
+        while (!client.isReady(node, attemptStartTime) && attemptStartTime < expiryTime) {
+            if (client.connectionFailed(node)) {
+                throw new IOException("Connection to " + node + " failed.");
+            }
+            long pollTimeout = expiryTime - attemptStartTime;
+            client.poll(pollTimeout, attemptStartTime);
+            attemptStartTime = time.milliseconds();
+        }
+        return client.isReady(node, attemptStartTime);
+    }
+
+    /**
+     * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received or a
+     * disconnection happens (which can happen for a number of reasons including a request timeout).
+     *
+     * In case of a disconnection, an `IOException` is thrown.
+     *
+     * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
+     * care.
+     */
+    public static ClientResponse sendAndReceive(KafkaClient client, ClientRequest request, Time time) throws IOException {
+        client.send(request, time.milliseconds());
+        while (true) {
+            List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
+            for (ClientResponse response : responses) {
+                if (response.requestHeader().correlationId() == request.correlationId()) {
+                    if (response.wasDisconnected()) {
+                        throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");
+                    }
+                    if (response.versionMismatch() != null) {
+                        throw response.versionMismatch();
+                    }
+                    return response;
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 78dd668..9342791 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -159,6 +159,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final int requestTimeoutMs;
     private final ProducerInterceptors<K, V> interceptors;
     private final ApiVersions apiVersions;
+    private final TransactionState transactionState;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -185,7 +186,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
-             keySerializer, valueSerializer);
+                keySerializer, valueSerializer);
     }
 
     /**
@@ -208,7 +209,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
-             keySerializer, valueSerializer);
+                keySerializer, valueSerializer);
     }
 
     @SuppressWarnings({"unchecked", "deprecation"})
@@ -218,7 +219,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
             this.time = Time.SYSTEM;
-
             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
@@ -255,48 +255,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                     ProducerInterceptor.class);
             this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
-
             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
-            /* check for user defined settings.
-             * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
-             * This should be removed with release 0.9 when the deprecated configs are removed.
-             */
-            if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
-                log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
-                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
-                if (blockOnBufferFull) {
-                    this.maxBlockTimeMs = Long.MAX_VALUE;
-                } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
-                    log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
-                            "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                    this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-                } else {
-                    this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                }
-            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
-                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
-                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-            } else {
-                this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            }
 
-            /* check for user defined settings.
-             * If the TIME_OUT config is set use that for request timeout.
-             * This should be removed with release 0.9
-             */
-            if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
-                log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
-                        ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-                this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
-            } else {
-                this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            }
+            this.maxBlockTimeMs = configureMaxBlockTime(config, userProvidedConfigs);
+            this.requestTimeoutMs = configureRequestTimeout(config, userProvidedConfigs);
+            this.transactionState = configureTransactionState(config, time);
+            int retries = configureRetries(config, transactionState != null);
+            int maxInflightRequests = configureInflightRequests(config, transactionState != null);
+            short acks = configureAcks(config, transactionState != null);
 
             this.apiVersions = new ApiVersions();
             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
@@ -306,8 +276,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     retryBackoffMs,
                     metrics,
                     time,
-                    apiVersions);
-
+                    apiVersions,
+                    transactionState);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
@@ -316,7 +286,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                             this.metrics, time, "producer", channelBuilder),
                     this.metadata,
                     clientId,
-                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+                    maxInflightRequests,
                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
@@ -327,33 +297,131 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,
-                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
+                    maxInflightRequests == 1,
                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
-                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
-                    config.getInt(ProducerConfig.RETRIES_CONFIG),
+                    acks,
+                    retries,
                     this.metrics,
                     Time.SYSTEM,
                     this.requestTimeoutMs,
+                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
+                    this.transactionState,
                     apiVersions);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
-
             this.errors = this.metrics.sensor("errors");
-
-
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
             log.debug("Kafka producer started");
         } catch (Throwable t) {
-            // call close methods if internal objects are already constructed
-            // this is to prevent resource leak. see KAFKA-2121
+            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
             close(0, TimeUnit.MILLISECONDS, true);
             // now propagate the exception
             throw new KafkaException("Failed to construct kafka producer", t);
         }
     }
 
+    private static long configureMaxBlockTime(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
+        /* check for user defined settings.
+         * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
+         * This should be removed with release 0.9 when the deprecated configs are removed.
+         */
+        if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
+            log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
+                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
+            if (blockOnBufferFull) {
+                return Long.MAX_VALUE;
+            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
+                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
+                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+                return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+            } else {
+                return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            }
+        } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
+            log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
+                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+        } else {
+            return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+        }
+    }
+
+    private static int configureRequestTimeout(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
+        /* check for user defined settings.
+         * If the TIME_OUT config is set use that for request timeout.
+         * This should be removed with release 0.9
+         */
+        if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
+            log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
+                    ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            return config.getInt(ProducerConfig.TIMEOUT_CONFIG);
+        } else {
+            return config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        }
+    }
+
+    private static TransactionState configureTransactionState(ProducerConfig config, Time time) {
+        boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+        if (idempotenceEnabled) {
+            return new TransactionState(time);
+        } else {
+            return null;
+        }
+    }
+
+    private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled) {
+        boolean userConfiguredRetries = false;
+        if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) {
+            userConfiguredRetries = true;
+        }
+        if (idempotenceEnabled && !userConfiguredRetries) {
+            log.info("Overriding the default retries config to " + 3 + " since the idempotent producer is enabled.");
+            return 3;
+        }
+        if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) {
+            throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
+        }
+        return config.getInt(ProducerConfig.RETRIES_CONFIG);
+    }
+
+    private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
+        boolean userConfiguredInflights = false;
+        if (config.originals().containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+            userConfiguredInflights = true;
+        }
+        if (idempotenceEnabled && !userConfiguredInflights) {
+            log.info("Overriding the default " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 since idempontence is enabled.");
+            return 1;
+        }
+        if (idempotenceEnabled && config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) != 1) {
+            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 in order" +
+                    "to use the idempotent producer. Otherwise we cannot guarantee idempotence.");
+        }
+        return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+    }
+
+    private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled) {
+        boolean userConfiguredAcks = false;
+        short acks = (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));
+        if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) {
+            userConfiguredAcks = true;
+        }
+
+        if (idempotenceEnabled && !userConfiguredAcks) {
+            log.info("Overriding the default " + ProducerConfig.ACKS_CONFIG + " to all since idempotence is enabled");
+            return -1;
+        }
+
+        if (idempotenceEnabled && acks != -1) {
+            throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
+                    "producer. Otherwise we cannot guarantee idempotence");
+        }
+        return acks;
+    }
+
     private static int parseAcks(String acksString) {
         try {
             return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());
@@ -587,14 +655,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private void ensureValidRecordSize(int size) {
         if (size > this.maxRequestSize)
             throw new RecordTooLargeException("The message is " + size +
-                                              " bytes when serialized which is larger than the maximum request size you have configured with the " +
-                                              ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
-                                              " configuration.");
+                    " bytes when serialized which is larger than the maximum request size you have configured with the " +
+                    ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
+                    " configuration.");
         if (size > this.totalMemorySize)
             throw new RecordTooLargeException("The message is " + size +
-                                              " bytes when serialized which is larger than the total memory buffer you have configured with the " +
-                                              ProducerConfig.BUFFER_MEMORY_CONFIG +
-                                              " configuration.");
+                    " bytes when serialized which is larger than the total memory buffer you have configured with the " +
+                    ProducerConfig.BUFFER_MEMORY_CONFIG +
+                    " configuration.");
     }
 
     /**
@@ -706,7 +774,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         if (timeout > 0) {
             if (invokedFromCallback) {
                 log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
-                    "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
+                        "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
             } else {
                 // Try to close gracefully.
                 if (this.sender != null)
@@ -724,7 +792,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
         if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
             log.info("Proceeding to force close the producer since pending requests could not be completed " +
-                "within timeout {} ms.", timeout);
+                    "within timeout {} ms.", timeout);
             this.sender.forceClose();
             // Only join the sender thread when not calling from callback.
             if (!invokedFromCallback) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 22797b6..ab09997 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-
 /**
  * A mock of the producer interface you can use for testing code that uses Kafka.
  * <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 70b80c7..d6e03d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -224,6 +224,14 @@ public class ProducerConfig extends AbstractConfig {
                                                         + "Implementing the <code>ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "
                                                         + "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
 
+    /** <code>enable.idempotence</code> */
+    public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
+    public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
+                                                        + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. This is set to 'false' by default. "
+                                                        + "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be set to 1 and "
+                                                        + "<code>" + RETRIES_CONFIG + "</code> cannot be zero. Additionally " + ACKS_CONFIG + " must be set to 'all'. If these values "
+                                                        + "are left at their defaults, we will override the default to be suitable. "
+                                                        + "If the values are set to something incompatible with the idempotent producer, a ConfigException will be thrown.";
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -306,21 +314,23 @@ public class ProducerConfig extends AbstractConfig {
                                         null,
                                         Importance.LOW,
                                         INTERCEPTOR_CLASSES_DOC)
-
-                                // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
                                         Importance.MEDIUM,
                                         CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
-                                .withClientSaslSupport();
-
+                                .withClientSaslSupport()
+                                .define(ENABLE_IDEMPOTENCE_CONFIG,
+                                        Type.BOOLEAN,
+                                        false,
+                                        Importance.LOW,
+                                        ENABLE_IDEMPOTENCE_DOC);
     }
 
     public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
                                                             Serializer<?> keySerializer, Serializer<?> valueSerializer) {
-        Map<String, Object> newConfigs = new HashMap<String, Object>();
+        Map<String, Object> newConfigs = new HashMap<>();
         newConfigs.putAll(configs);
         if (keySerializer != null)
             newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java b/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
new file mode 100644
index 0000000..fa30b3f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production.
+ */
+public class TransactionState {
+    private volatile PidAndEpoch pidAndEpoch;
+    private final Map<TopicPartition, Integer> sequenceNumbers;
+    private final Time time;
+
+    public static class PidAndEpoch {
+        public final long producerId;
+        public final short epoch;
+
+        PidAndEpoch(long producerId, short epoch) {
+            this.producerId = producerId;
+            this.epoch = epoch;
+        }
+
+        public boolean isValid() {
+            return NO_PRODUCER_ID < producerId;
+        }
+    }
+
+    public TransactionState(Time time) {
+        this.pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        this.sequenceNumbers = new HashMap<>();
+        this.time = time;
+    }
+
+    public boolean hasPid() {
+        return pidAndEpoch.isValid();
+    }
+
+    /**
+     * A blocking call to get the pid and epoch for the producer. If the PID and epoch has not been set, this method
+     * will block for at most maxWaitTimeMs. It is expected that this method be called from application thread
+     * contexts (ie. through Producer.send). The PID it self will be retrieved in the background thread.
+     * @param maxWaitTimeMs The maximum time to block.
+     * @return a PidAndEpoch object. Callers must call the 'isValid' method fo the returned object to ensure that a
+     *         valid Pid and epoch is actually returned.
+     */
+    public synchronized PidAndEpoch awaitPidAndEpoch(long maxWaitTimeMs) throws InterruptedException {
+        long start = time.milliseconds();
+        long elapsed = 0;
+        while (!hasPid() && elapsed < maxWaitTimeMs) {
+            wait(maxWaitTimeMs);
+            elapsed = time.milliseconds() - start;
+        }
+        return pidAndEpoch;
+    }
+
+    /**
+     * Get the current pid and epoch without blocking. Callers must use {@link PidAndEpoch#isValid()} to
+     * verify that the result is valid.
+     *
+     * @return the current PidAndEpoch.
+     */
+    public PidAndEpoch pidAndEpoch() {
+        return pidAndEpoch;
+    }
+
+    /**
+     * Set the pid and epoch atomically. This method will signal any callers blocked on the `pidAndEpoch` method
+     * once the pid is set. This method will be called on the background thread when the broker responds with the pid.
+     */
+    public synchronized void setPidAndEpoch(long pid, short epoch) {
+        this.pidAndEpoch = new PidAndEpoch(pid, epoch);
+        if (this.pidAndEpoch.isValid())
+            notifyAll();
+    }
+
+    /**
+     * This method is used when the producer needs to reset it's internal state because of an irrecoverable exception
+     * from the broker.
+     *
+     * We need to reset the producer id and associated state when we have sent a batch to the broker, but we either get
+     * a non-retriable exception or we run out of retries, or the batch expired in the producer queue after it was already
+     * sent to the broker.
+     *
+     * In all of these cases, we don't know whether batch was actually committed on the broker, and hence whether the
+     * sequence number was actually updated. If we don't reset the producer state, we risk the chance that all future
+     * messages will return an OutOfOrderSequenceException.
+     */
+    public synchronized void resetProducerId() {
+        setPidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        this.sequenceNumbers.clear();
+    }
+
+    /**
+     * Returns the next sequence number to be written to the given TopicPartition.
+     */
+    public synchronized Integer sequenceNumber(TopicPartition topicPartition) {
+        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
+        if (currentSequenceNumber == null) {
+            currentSequenceNumber = 0;
+            sequenceNumbers.put(topicPartition, currentSequenceNumber);
+        }
+        return currentSequenceNumber;
+    }
+
+    public synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
+        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
+        if (currentSequenceNumber == null)
+            throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");
+
+        currentSequenceNumber += increment;
+        sequenceNumbers.put(topicPartition, currentSequenceNumber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 2b90f81..4f68c20 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.AbstractRecords;
@@ -66,6 +67,7 @@ public final class ProducerBatch {
         this.lastAppendTime = createdMs;
         this.produceFuture = new ProduceRequestResult(topicPartition);
         this.completed = new AtomicBoolean();
+        this.retry = false;
     }
 
     /**
@@ -208,7 +210,7 @@ public final class ProducerBatch {
     /**
      * Returns if the batch is been retried for sending to kafka
      */
-    private boolean inRetry() {
+    public boolean inRetry() {
         return this.retry;
     }
 
@@ -228,10 +230,18 @@ public final class ProducerBatch {
         return recordsBuilder.isFull();
     }
 
+    public void setProducerState(TransactionState.PidAndEpoch pidAndEpoch, int baseSequence) {
+        recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
+    }
+
     public void close() {
         recordsBuilder.close();
     }
 
+    public boolean isClosed() {
+        return recordsBuilder.isClosed();
+    }
+
     public ByteBuffer buffer() {
         return recordsBuilder.buffer();
     }
@@ -247,4 +257,11 @@ public final class ProducerBatch {
     public byte magic() {
         return recordsBuilder.magic();
     }
+
+    /**
+     * Return the ProducerId (Pid) of the current batch.
+     */
+    public long producerId() {
+        return recordsBuilder.producerId();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 1e495d4..e07d201 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -18,11 +18,13 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -30,9 +32,9 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
@@ -80,6 +82,7 @@ public final class RecordAccumulator {
     // The following variables are only accessed by the sender thread, so we don't need to protect them.
     private final Set<TopicPartition> muted;
     private int drainIndex;
+    private final TransactionState transactionState;
 
     /**
      * Create a new record accumulator
@@ -95,6 +98,8 @@ public final class RecordAccumulator {
      * @param metrics The metrics
      * @param time The time instance to use
      * @param apiVersions Request API versions for current connected brokers
+     * @param transactionState The shared transaction state object which tracks Pids, epochs, and sequence numbers per
+     *                         partition.
      */
     public RecordAccumulator(int batchSize,
                              long totalSize,
@@ -103,7 +108,8 @@ public final class RecordAccumulator {
                              long retryBackoffMs,
                              Metrics metrics,
                              Time time,
-                             ApiVersions apiVersions) {
+                             ApiVersions apiVersions,
+                             TransactionState transactionState) {
         this.drainIndex = 0;
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
@@ -119,6 +125,7 @@ public final class RecordAccumulator {
         this.muted = new HashSet<>();
         this.time = time;
         this.apiVersions = apiVersions;
+        this.transactionState = transactionState;
         registerMetrics(metrics, metricGrpName);
     }
 
@@ -202,8 +209,7 @@ public final class RecordAccumulator {
                     return appendResult;
                 }
 
-                MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, maxUsableMagic, compression,
-                        TimestampType.CREATE_TIME, this.batchSize);
+                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                 ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
 
@@ -222,17 +228,24 @@ public final class RecordAccumulator {
         }
     }
 
+    private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
+        if (transactionState != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
+            throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
+                    "support the required message format (v2). The broker must be version 0.11 or later.");
+        }
+        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, this.batchSize);
+    }
+
     /**
-     * If `ProducerBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
-     * resources (like compression streams buffers).
+     *  Try to append to a ProducerBatch. If it is full, we return null and a new batch is created. If the existing batch is
+     *  full, it will be closed right before send, or if it is expired, or when the producer is closed, whichever
+     *  comes first.
      */
     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<ProducerBatch> deque) {
         ProducerBatch last = deque.peekLast();
         if (last != null) {
             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
-            if (future == null)
-                last.close();
-            else
+            if (future != null)
                 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
         }
         return null;
@@ -404,7 +417,7 @@ public final class RecordAccumulator {
                 TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                 // Only proceed if the partition has no in-flight batches.
                 if (!muted.contains(tp)) {
-                    Deque<ProducerBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
+                    Deque<ProducerBatch> deque = getDeque(tp);
                     if (deque != null) {
                         synchronized (deque) {
                             ProducerBatch first = deque.peekFirst();
@@ -418,7 +431,27 @@ public final class RecordAccumulator {
                                         // request
                                         break;
                                     } else {
+                                        TransactionState.PidAndEpoch pidAndEpoch = null;
+                                        if (transactionState != null) {
+                                            pidAndEpoch = transactionState.pidAndEpoch();
+                                            if (!pidAndEpoch.isValid())
+                                                // we cannot send the batch until we have refreshed the PID
+                                                break;
+                                        }
+
                                         ProducerBatch batch = deque.pollFirst();
+                                        if (pidAndEpoch != null && !batch.inRetry()) {
+                                            // If the batch is in retry, then we should not change the pid and
+                                            // sequence number, since this may introduce duplicates. In particular,
+                                            // the previous attempt may actually have been accepted, and if we change
+                                            // the pid and sequence here, this attempt will also be accepted, causing
+                                            // a duplicate.
+                                            int sequenceNumber = transactionState.sequenceNumber(batch.topicPartition);
+                                            log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
+                                                    node, pidAndEpoch.producerId, pidAndEpoch.epoch,
+                                                    batch.topicPartition, sequenceNumber);
+                                            batch.setProducerState(pidAndEpoch, sequenceNumber);
+                                        }
                                         batch.close();
                                         size += batch.sizeInBytes();
                                         ready.add(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index a7394e1..ab92522 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -22,11 +22,13 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -37,8 +39,11 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.InitPidRequest;
+import org.apache.kafka.common.requests.InitPidResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.Time;
@@ -46,6 +51,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -96,9 +102,15 @@ public class Sender implements Runnable {
     /* the max time to wait for the server to respond to the request*/
     private final int requestTimeout;
 
+    /* The max time to wait before retrying a request which has failed */
+    private final long retryBackoffMs;
+
     /* current request API versions supported by the known brokers */
     private final ApiVersions apiVersions;
 
+    /* all the state related to transactions, in particular the PID, epoch, and sequence numbers */
+    private final TransactionState transactionState;
+
     public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
@@ -109,6 +121,8 @@ public class Sender implements Runnable {
                   Metrics metrics,
                   Time time,
                   int requestTimeout,
+                  long retryBackoffMs,
+                  TransactionState transactionState,
                   ApiVersions apiVersions) {
         this.client = client;
         this.accumulator = accumulator;
@@ -121,7 +135,9 @@ public class Sender implements Runnable {
         this.time = time;
         this.sensors = new SenderMetrics(metrics);
         this.requestTimeout = requestTimeout;
+        this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
+        this.transactionState = transactionState;
     }
 
     /**
@@ -172,6 +188,9 @@ public class Sender implements Runnable {
      */
     void run(long now) {
         Cluster cluster = metadata.fetch();
+
+        maybeWaitForPid();
+
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
@@ -197,10 +216,8 @@ public class Sender implements Runnable {
         }
 
         // create produce requests
-        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster,
-                                                                         result.readyNodes,
-                                                                         this.maxRequestSize,
-                                                                         now);
+        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
+                this.maxRequestSize, now);
         if (guaranteeMessageOrder) {
             // Mute all the partitions drained
             for (List<ProducerBatch> batchList : batches.values()) {
@@ -210,9 +227,22 @@ public class Sender implements Runnable {
         }
 
         List<ProducerBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
-        // update sensors
-        for (ProducerBatch expiredBatch : expiredBatches)
+
+        boolean needsTransactionStateReset = false;
+        // Reset the PID if an expired batch has previously been sent to the broker. Also update the metrics
+        // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
+        // we need to reset the producer id here.
+        for (ProducerBatch expiredBatch : expiredBatches) {
+            if (transactionState != null && expiredBatch.inRetry()) {
+                needsTransactionStateReset = true;
+            }
             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
+        }
+
+        if (needsTransactionStateReset) {
+            transactionState.resetProducerId();
+            return;
+        }
 
         sensors.updateProduceRequestMetrics(batches);
 
@@ -253,6 +283,50 @@ public class Sender implements Runnable {
         initiateClose();
     }
 
+    private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException {
+        String nodeId = node.idString();
+        InitPidRequest.Builder builder = new InitPidRequest.Builder(null);
+        ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null);
+        return NetworkClientUtils.sendAndReceive(client, request, time);
+    }
+
+    private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException {
+        Node node = client.leastLoadedNode(time.milliseconds());
+        if (NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs)) {
+            return node;
+        }
+        return null;
+    }
+
+    private void maybeWaitForPid() {
+        if (transactionState == null)
+            return;
+
+        while (!transactionState.hasPid()) {
+            try {
+                Node node = awaitLeastLoadedNodeReady(requestTimeout);
+                if (node != null) {
+                    ClientResponse response = sendAndAwaitInitPidRequest(node);
+                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
+                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
+                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
+                    } else {
+                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
+                                "We will back off and try again.", node);
+                    }
+                } else {
+                    log.debug("Could not find an available broker to send InitPidRequest to. " +
+                            "We will back off and try again.");
+                }
+            } catch (Exception e) {
+                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
+            }
+            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
+            time.sleep(retryBackoffMs);
+            metadata.requestUpdate();
+        }
+    }
+
     /**
      * Handle a produce response
      */
@@ -300,32 +374,55 @@ public class Sender implements Runnable {
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                long now) {
         Errors error = response.error;
-        if (error != Errors.NONE && canRetry(batch, error)) {
-            // retry
-            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
-                     correlationId,
-                     batch.topicPartition,
-                     this.retries - batch.attempts() - 1,
-                     error);
-            this.accumulator.reenqueue(batch, now);
-            this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
-        } else {
-            RuntimeException exception;
-            if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
-                exception = new TopicAuthorizationException(batch.topicPartition.topic());
-            else
-                exception = error.exception();
-            // tell the user the result of their request
-            batch.done(response.baseOffset, response.logAppendTime, exception);
-            this.accumulator.deallocate(batch);
-            if (error != Errors.NONE)
+        if (error != Errors.NONE) {
+            if (canRetry(batch, error)) {
+                log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
+                        correlationId,
+                        batch.topicPartition,
+                        this.retries - batch.attempts() - 1,
+                        error);
+                if (transactionState == null) {
+                    reenqueueBatch(batch, now);
+                } else if (transactionState.pidAndEpoch().producerId == batch.producerId()) {
+                    // If idempotence is enabled only retry the request if the current PID is the same as the pid of the batch.
+                    log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
+                            transactionState.sequenceNumber(batch.topicPartition));
+                    reenqueueBatch(batch, now);
+                } else {
+                    failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
+                            "batch but the producer id changed from " + batch.producerId() + " to " +
+                            transactionState.pidAndEpoch().producerId + " in the mean time. This batch will be dropped."));
+                    this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+                }
+            } else {
+                final RuntimeException exception;
+                if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
+                    exception = new TopicAuthorizationException(batch.topicPartition.topic());
+                else
+                    exception = error.exception();
+                if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionState.pidAndEpoch().producerId)
+                    log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " +
+                                    "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
+                            correlationId, batch.topicPartition, response.baseOffset);
+                // tell the user the result of their request
+                failBatch(batch, response, exception);
                 this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
-        }
-        if (error.exception() instanceof InvalidMetadataException) {
-            if (error.exception() instanceof UnknownTopicOrPartitionException)
-                log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
-                        "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
-            metadata.requestUpdate();
+            }
+            if (error.exception() instanceof InvalidMetadataException) {
+                if (error.exception() instanceof UnknownTopicOrPartitionException)
+                    log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
+                            "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
+                metadata.requestUpdate();
+            }
+
+        } else {
+            completeBatch(batch, response);
+
+            if (transactionState != null && transactionState.pidAndEpoch().producerId == batch.producerId()) {
+                transactionState.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
+                log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
+                        transactionState.sequenceNumber(batch.topicPartition));
+            }
         }
 
         // Unmute the completed partition.
@@ -333,6 +430,27 @@ public class Sender implements Runnable {
             this.accumulator.unmutePartition(batch.topicPartition);
     }
 
+    private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
+        this.accumulator.reenqueue(batch, currentTimeMs);
+        this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
+    }
+
+    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
+        batch.done(response.baseOffset, response.logAppendTime, null);
+        this.accumulator.deallocate(batch);
+    }
+
+    private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
+        if (transactionState != null && batch.producerId() == transactionState.pidAndEpoch().producerId) {
+            // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
+            // about the previously committed message. Note that this will discard the producer id and sequence
+            // numbers for all existing partitions.
+            transactionState.resetProducerId();
+        }
+        batch.done(response.baseOffset, response.logAppendTime, exception);
+        this.accumulator.deallocate(batch);
+    }
+
     /**
      * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
new file mode 100644
index 0000000..469ba98
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class DuplicateSequenceNumberException extends RetriableException {
+
+    public DuplicateSequenceNumberException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java b/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
new file mode 100644
index 0000000..1c1cc6b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class OutOfOrderSequenceException extends ApiException {
+
+    public OutOfOrderSequenceException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
new file mode 100644
index 0000000..c699d8e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class ProducerFencedException extends ApiException {
+
+    public ProducerFencedException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 89b2000..4183638 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -46,7 +46,8 @@ public enum ApiKeys {
     API_VERSIONS(18, "ApiVersions"),
     CREATE_TOPICS(19, "CreateTopics"),
     DELETE_TOPICS(20, "DeleteTopics"),
-    DELETE_RECORDS(21, "DeleteRecords");
+    DELETE_RECORDS(21, "DeleteRecords"),
+    INIT_PRODUCER_ID(22, "InitProducerId");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a2a33ee..519e52c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupLoadInProgressException;
@@ -39,10 +40,12 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidRequiredAcksException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
 import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
@@ -166,12 +169,15 @@ public enum Errors {
             " the message was sent to an incompatible broker. See the broker logs for more details.")),
     UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
         new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
-    POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy."));
+    POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
+    OUT_OF_ORDER_SEQUENCE_NUMBER(45, new OutOfOrderSequenceException("The broker received an out of order sequence number")),
+    DUPLICATE_SEQUENCE_NUMBER(46, new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
+    PRODUCER_FENCED(47, new ProducerFencedException("Producer attempted an operation with an old epoch"));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
-    private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
-    private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
+    private static Map<Class<?>, Errors> classToError = new HashMap<>();
+    private static Map<Short, Errors> codeToError = new HashMap<>();
 
     static {
         for (Errors error : Errors.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 5d7004a..37eb75c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1176,6 +1176,29 @@ public class Protocol {
     public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
     public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
 
+    /* Transactions API */
+    public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
+            new Field("transactional_id",
+                    NULLABLE_STRING,
+                    "The transactional id whose pid we want to retrieve or generate.")
+    );
+
+    public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
+            new Field("error_code",
+                    INT16,
+                    "An integer error code."),
+            new Field("pid",
+                    INT64,
+                    "The pid for the input transactional id. If the input id was empty, then this is used only for ensuring idempotence of messages"),
+            new Field("epoch",
+                    INT16,
+                    "The epoch for the pid. Will always be 0 if no transactional id was specified in the request.")
+    );
+
+    public static final Schema[] INIT_PRODUCER_ID_REQUEST = new Schema[] {INIT_PRODUCER_ID_REQUEST_V0};
+
+    public static final Schema[] INIT_PRODUCER_ID_RESPONSE = new Schema[] {INIT_PRODUCER_ID_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1208,6 +1231,7 @@ public class Protocol {
         REQUESTS[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_REQUEST;
         REQUESTS[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_REQUEST;
         REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
+        REQUESTS[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1231,6 +1255,7 @@ public class Protocol {
         RESPONSES[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_RESPONSE;
         RESPONSES[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_RESPONSE;
         RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
+        RESPONSES[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 325690d..c42390b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -134,6 +134,10 @@ public class Struct {
         return (Integer) get(name);
     }
 
+    public Long getUnsignedInt(String name) {
+        return (Long) get(name);
+    }
+
     public Long getLong(Field field) {
         return (Long) get(field);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index ffca09c..57d31f4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -194,6 +194,36 @@ public abstract class Type {
         }
     };
 
+    public static final Type UNSIGNED_INT32 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            ByteUtils.writeUnsignedInt(buffer, (long) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return ByteUtils.readUnsignedInt(buffer);
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 4;
+        }
+
+        @Override
+        public String toString() {
+            return "UINT32";
+        }
+
+        @Override
+        public Long validate(Object item) {
+            if (item instanceof Long)
+                return (Long) item;
+            else
+                throw new SchemaException(item + " is not a Long.");
+        }
+    };
+
     public static final Type INT64 = new Type() {
         @Override
         public void write(ByteBuffer buffer, Object o) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 1b74a7d..7e09b93 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -175,6 +175,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
     }
 
     @Override
+    public boolean hasProducerId() {
+        return false;
+    }
+
+    @Override
     public long sequence() {
         return RecordBatch.NO_SEQUENCE;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
index 53245e7..78ad050 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
@@ -19,6 +19,11 @@ package org.apache.kafka.common.record;
 abstract class AbstractRecordBatch implements RecordBatch {
 
     @Override
+    public boolean hasProducerId() {
+        return RecordBatch.NO_PRODUCER_ID < producerId();
+    }
+
+    @Override
     public long nextOffset() {
         return lastOffset() + 1;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index e59d9fd..8c4e771 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -147,7 +147,7 @@ public class MemoryRecords extends AbstractRecords {
 
                 messagesRead += 1;
 
-                if (filter.shouldRetain(record)) {
+                if (filter.shouldRetain(batch, record)) {
                     // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
                     // the corrupted batch with correct data.
                     if (!record.hasMagic(batchMagic))
@@ -245,7 +245,7 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public interface RecordFilter {
-        boolean shouldRetain(Record record);
+        boolean shouldRetain(RecordBatch recordBatch, Record record);
     }
 
     public static class FilterResult {
@@ -338,13 +338,27 @@ public class MemoryRecords extends AbstractRecords {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
     }
 
+    public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Long pid,
+                                            short epoch, int baseSequence, SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
+                pid, epoch, baseSequence, records);
+    }
+
     public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
                                             TimestampType timestampType, SimpleRecord... records) {
+        return withRecords(magic, initialOffset, compressionType, timestampType, RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, records);
+    }
+
+    private static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
+                                             TimestampType timestampType, long pid, short epoch, int baseSequence,
+                                             SimpleRecord ... records) {
         if (records.length == 0)
             return MemoryRecords.EMPTY;
         int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
         ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
-        MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset);
+        MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
+                System.currentTimeMillis(), pid, epoch, baseSequence);
         for (SimpleRecord record : records)
             builder.append(record);
         return builder.build();