You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/06 18:51:13 UTC

[4/6] kafka git commit: KAFKA-5121; Implement transaction index for KIP-98

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b7f340f..5722a43 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -30,9 +30,10 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.ListOffsetRequest
+import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, mutable}
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -40,10 +41,13 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
 import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import java.util.Map.{Entry => JEntry}
+import java.lang.{Long => JLong}
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
-    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, Map.empty[Long, ProducerAppendInfo], false)
+    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
@@ -59,9 +63,6 @@ object LogAppendInfo {
  * @param shallowCount The number of shallow messages
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
- * @param producerAppendInfos A map from a Pid to a ProducerAppendInfo, which is used to validate each Record in a
- *                            RecordBatch and keep track of metadata across Records in a RecordBatch.
- * @param isDuplicate Indicates whether the message set is a duplicate of a message at the tail of the log.
  */
 case class LogAppendInfo(var firstOffset: Long,
                          var lastOffset: Long,
@@ -72,9 +73,19 @@ case class LogAppendInfo(var firstOffset: Long,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
                          validBytes: Int,
-                         offsetsMonotonic: Boolean,
-                         producerAppendInfos: Map[Long, ProducerAppendInfo],
-                         isDuplicate: Boolean = false)
+                         offsetsMonotonic: Boolean)
+
+/**
+ * A class used to hold useful metadata about a completed transaction. This is used to build
+ * the transaction index after appending to the log.
+ *
+ * @param producerId The ID of the producer
+ * @param firstOffset The first offset (inclusive) of the transaction
+ * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
+ *                   COMMIT/ABORT control record which indicates the transaction's completion.
+ * @param isAborted Whether or not the transaction was aborted
+ */
+case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean)
 
 /**
  * An append-only log for storing messages.
@@ -111,8 +122,7 @@ class Log(@volatile var dir: File,
           scheduler: Scheduler,
           time: Time = Time.SYSTEM,
           val maxPidExpirationMs: Int = 60 * 60 * 1000,
-          val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
-          val pidSnapshotIntervalMs: Int = 60 * 1000) extends Logging with KafkaMetricsGroup {
+          val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
 
@@ -133,8 +143,10 @@ class Log(@volatile var dir: File,
 
   @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
 
-  /* Construct and load PID map */
-  private val pidMap = new ProducerIdMapping(config, topicPartition, dir, maxPidExpirationMs)
+  /* The earliest offset which is part of an incomplete transaction. This is used to compute the LSO. */
+  @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None
+
+  private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs)
 
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@@ -147,7 +159,7 @@ class Log(@volatile var dir: File,
     loadSegments()
 
     /* Calculate the offset of the next message */
-    nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
+    nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset,
       activeSegment.size.toInt)
 
     leaderEpochCache.clearLatest(nextOffsetMetadata.messageOffset)
@@ -157,7 +169,7 @@ class Log(@volatile var dir: File,
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
     leaderEpochCache.clearEarliest(logStartOffset)
 
-    buildAndRecoverPidMap(logEndOffset)
+    loadProducerState(logEndOffset)
 
     info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
       .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
@@ -189,19 +201,12 @@ class Log(@volatile var dir: File,
     },
     tags)
 
-  scheduler.schedule(name = "PeriodicPidExpirationCheck", fun = () => {
+  scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
     lock synchronized {
-      pidMap.removeExpiredPids(time.milliseconds)
+      producerStateManager.removeExpiredProducers(time.milliseconds)
     }
   }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
 
-  scheduler.schedule(name = "PeriodicPidSnapshotTask", fun = () => {
-    lock synchronized {
-      pidMap.maybeTakeSnapshot()
-    }
-  }, period = pidSnapshotIntervalMs, unit = TimeUnit.MILLISECONDS)
-
-
   /** The name of this log */
   def name  = dir.getName()
 
@@ -212,13 +217,10 @@ class Log(@volatile var dir: File,
       new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir)))
   }
 
-  /* Load the log segments from the log files on disk */
-  private def loadSegments() {
+  private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
     var swapFiles = Set[File]()
 
-    // first do a pass through the files in the log directory and remove any temporary files
-    // and find any interrupted swap operations
-    for(file <- dir.listFiles if file.isFile) {
+    for (file <- dir.listFiles if file.isFile) {
       if(!file.canRead)
         throw new IOException("Could not read file " + file)
       val filename = file.getName
@@ -229,48 +231,51 @@ class Log(@volatile var dir: File,
         // we crashed in the middle of a swap operation, to recover:
         // if a log, delete the .index file, complete the swap operation later
         // if an index just delete it, it will be rebuilt
-        val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
-        if(baseName.getPath.endsWith(IndexFileSuffix)) {
+        val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
+        if (isIndexFile(baseFile)) {
           Files.deleteIfExists(file.toPath)
-        } else if(baseName.getPath.endsWith(LogFileSuffix)){
-          // delete the index
-          val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
-          Files.deleteIfExists(index.toPath())
+        } else if (isLogFile(baseFile)) {
+          // delete the index files
+          val offset = offsetFromFilename(baseFile.getName)
+          Files.deleteIfExists(Log.offsetIndexFile(dir, offset).toPath)
+          Files.deleteIfExists(Log.timeIndexFile(dir, offset).toPath)
+          Files.deleteIfExists(Log.transactionIndexFile(dir, offset).toPath)
           swapFiles += file
         }
       }
     }
+    swapFiles
+  }
 
-    // now do a second pass and load all the .log and all index files
-    for(file <- dir.listFiles if file.isFile) {
+  private def loadSegmentFiles(): Unit = {
+    // load segments in ascending order because transactional data from one segment may depend on the
+    // segments that come before it
+    for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
       val filename = file.getName
-      if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
+      if (isIndexFile(file)) {
         // if it is an index file, make sure it has a corresponding .log file
-        val logFile =
-          if (filename.endsWith(TimeIndexFileSuffix))
-            new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
-          else
-            new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
-
-        if(!logFile.exists) {
+        val offset = offsetFromFilename(filename)
+        val logFile = logFilename(dir, offset)
+        if (!logFile.exists) {
           warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
           Files.deleteIfExists(file.toPath)
         }
-      } else if(filename.endsWith(LogFileSuffix)) {
-        // if its a log file, load the corresponding log segment
+      } else if (isLogFile(file)) {
+        // if it's a log file, load the corresponding log segment
         val startOffset = offsetFromFilename(filename)
-        val indexFile = Log.indexFilename(dir, startOffset)
-        val timeIndexFile = Log.timeIndexFilename(dir, startOffset)
+        val indexFile = Log.offsetIndexFile(dir, startOffset)
+        val timeIndexFile = Log.timeIndexFile(dir, startOffset)
+        val txnIndexFile = Log.transactionIndexFile(dir, startOffset)
 
         val indexFileExists = indexFile.exists()
         val timeIndexFileExists = timeIndexFile.exists()
         val segment = new LogSegment(dir = dir,
-                                     startOffset = startOffset,
-                                     indexIntervalBytes = config.indexInterval,
-                                     maxIndexSize = config.maxIndexSize,
-                                     rollJitterMs = config.randomSegmentJitter,
-                                     time = time,
-                                     fileAlreadyExists = true)
+          startOffset = startOffset,
+          indexIntervalBytes = config.indexInterval,
+          maxIndexSize = config.maxIndexSize,
+          rollJitterMs = config.randomSegmentJitter,
+          time = time,
+          fileAlreadyExists = true)
 
         if (indexFileExists) {
           try {
@@ -279,25 +284,43 @@ class Log(@volatile var dir: File,
             if (!timeIndexFileExists)
               segment.timeIndex.resize(0)
             segment.timeIndex.sanityCheck()
+            segment.txnIndex.sanityCheck()
           } catch {
             case e: java.lang.IllegalArgumentException =>
               warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
-                s"${indexFile.getAbsolutePath} and rebuilding index...")
+                s"${indexFile.getAbsolutePath}, and ${txnIndexFile.getAbsolutePath} and rebuilding index...")
               Files.deleteIfExists(timeIndexFile.toPath)
               Files.delete(indexFile.toPath)
-              segment.recover(config.maxMessageSize)
+              segment.txnIndex.delete()
+              recoverSegment(segment)
           }
         } else {
-          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
-          segment.recover(config.maxMessageSize)
+          error("Could not find offset index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
+          recoverSegment(segment)
         }
         segments.put(startOffset, segment)
       }
     }
+  }
+
+  private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
+    val stateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs)
+    stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds)
+    logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment =>
+      val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset)
+      val fetchDataInfo = segment.read(startOffset, None, Int.MaxValue)
+      if (fetchDataInfo != null)
+        loadProducersFromLog(stateManager, fetchDataInfo.records)
+    }
+    val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache)
 
-    // Finally, complete any interrupted swap operations. To be crash-safe,
-    // log files that are replaced by the swap segment should be renamed to .deleted
-    // before the swap file is restored as the new segment file.
+    // once we have recovered the segment's data, take a snapshot to ensure that we won't
+    // need to reload the same segment again while recovering another segment.
+    stateManager.takeSnapshot()
+    bytesTruncated
+  }
+
+  private def completeSwapOperations(swapFiles: Set[File]): Unit = {
     for (swapFile <- swapFiles) {
       val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
       val filename = logFile.getName
@@ -306,18 +329,36 @@ class Log(@volatile var dir: File,
       val index =  new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
       val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
       val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
+      val txnIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TxnIndexFileSuffix) + SwapFileSuffix)
+      val txnIndex = new TransactionIndex(startOffset, txnIndexFile)
       val swapSegment = new LogSegment(FileRecords.open(swapFile),
-                                       index = index,
-                                       timeIndex = timeIndex,
-                                       baseOffset = startOffset,
-                                       indexIntervalBytes = config.indexInterval,
-                                       rollJitterMs = config.randomSegmentJitter,
-                                       time = time)
+        index = index,
+        timeIndex = timeIndex,
+        txnIndex = txnIndex,
+        baseOffset = startOffset,
+        indexIntervalBytes = config.indexInterval,
+        rollJitterMs = config.randomSegmentJitter,
+        time = time)
       info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath))
-      swapSegment.recover(config.maxMessageSize)
-      val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset)
+      recoverSegment(swapSegment)
+      val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset())
       replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
     }
+  }
+
+  /* Load the log segments from the log files on disk */
+  private def loadSegments() {
+    // first do a pass through the files in the log directory and remove any temporary files
+    // and find any interrupted swap operations
+    val swapFiles = removeTempFilesAndCollectSwapFiles()
+
+    // now do a second pass and load all the log and index files
+    loadSegmentFiles()
+
+    // Finally, complete any interrupted swap operations. To be crash-safe,
+    // log files that are replaced by the swap segment should be renamed to .deleted
+    // before the swap file is restored as the new segment file.
+    completeSwapOperations(swapFiles)
 
     if(logSegments.isEmpty) {
       // no existing segments, create a new mutable segment beginning at offset 0
@@ -330,13 +371,11 @@ class Log(@volatile var dir: File,
                                      fileAlreadyExists = false,
                                      initFileSize = this.initFileSize(),
                                      preallocate = config.preallocate))
-    } else {
-      if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
-        recoverLog()
-        // reset the index size of the currently active log segment to allow more entries
-        activeSegment.index.resize(config.maxIndexSize)
-        activeSegment.timeIndex.resize(config.maxIndexSize)
-      }
+    } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
+      recoverLog()
+      // reset the index size of the currently active log segment to allow more entries
+      activeSegment.index.resize(config.maxIndexSize)
+      activeSegment.timeIndex.resize(config.maxIndexSize)
     }
   }
 
@@ -347,66 +386,72 @@ class Log(@volatile var dir: File,
   private def recoverLog() {
     // if we have the clean shutdown marker, skip recovery
     if(hasCleanShutdownFile) {
-      this.recoveryPoint = activeSegment.nextOffset
+      this.recoveryPoint = activeSegment.nextOffset()
       return
     }
 
     // okay we need to actually recovery this log
     val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
     while(unflushed.hasNext) {
-      val curr = unflushed.next
-      info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
+      val segment = unflushed.next
+      info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name))
       val truncatedBytes =
         try {
-          curr.recover(config.maxMessageSize, Some(leaderEpochCache))
+          recoverSegment(segment, Some(leaderEpochCache))
         } catch {
           case _: InvalidOffsetException =>
-            val startOffset = curr.baseOffset
+            val startOffset = segment.baseOffset
             warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
                  "creating an empty one with starting offset " + startOffset)
-            curr.truncateTo(startOffset)
+            segment.truncateTo(startOffset)
         }
       if(truncatedBytes > 0) {
         // we had an invalid message, delete all remaining log
-        warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
+        warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name,
+          segment.nextOffset()))
         unflushed.foreach(deleteSegment)
       }
     }
   }
 
-  /**
-    * Creates an instance of id map for this log and updates the mapping
-    * in the case it is missing some messages. Note that the id mapping
-    * starts from a snapshot that is taken strictly before the log end
-    * offset. Consequently, we need to process the tail of the log to update
-    * the mapping.
-    */
-  private def buildAndRecoverPidMap(lastOffset: Long) {
-    lock synchronized {
-      info(s"Recovering PID mapping from offset $lastOffset for partition $topicPartition")
-      val currentTimeMs = time.milliseconds
-      pidMap.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)
-      logSegments(pidMap.mapEndOffset, lastOffset).foreach { segment =>
-        val startOffset = math.max(segment.baseOffset, pidMap.mapEndOffset)
+  private def loadProducerState(lastOffset: Long): Unit = lock synchronized {
+    info(s"Loading producer state from offset $lastOffset for partition $topicPartition")
+    val currentTimeMs = time.milliseconds
+    producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)
+
+    // only do the potentially expensive reloading of the last snapshot offset is lower than the
+    // log end offset (which would be the case on first startup) and there are active producers.
+    // if there are no active producers, then truncating shouldn't change that fact (although it
+    // could cause a producerId to expire earlier than expected), so we can skip the loading.
+    // This is an optimization for users which are not yet using idempotent/transactional features yet.
+    if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty) {
+      logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
+        val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset)
         val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
-        if (fetchDataInfo != null) {
-          fetchDataInfo.records.batches.asScala.foreach { batch =>
-            if (batch.hasProducerId) {
-              val pidEntry = ProducerIdEntry(batch.producerEpoch, batch.lastSequence, batch.lastOffset,
-                batch.lastSequence - batch.baseSequence, batch.maxTimestamp)
-              pidMap.load(batch.producerId, pidEntry, currentTimeMs)
-            }
-          }
-        }
+        if (fetchDataInfo != null)
+          loadProducersFromLog(producerStateManager, fetchDataInfo.records)
       }
-      pidMap.updateMapEndOffset(lastOffset)
     }
+
+    producerStateManager.updateMapEndOffset(lastOffset)
+    updateFirstUnstableOffset()
   }
 
-  private[log] def activePids: Map[Long, ProducerIdEntry] = {
-    lock synchronized {
-      pidMap.activePids
+  private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = {
+    val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
+    val completedTxns = ListBuffer.empty[CompletedTxn]
+    records.batches.asScala.foreach { batch =>
+      if (batch.hasProducerId) {
+        val lastEntry = producerStateManager.lastEntry(batch.producerId)
+        updateProducers(batch, loadedProducers, completedTxns, lastEntry, loadingFromLog = true)
+      }
     }
+    loadedProducers.values.foreach(producerStateManager.update)
+    completedTxns.foreach(producerStateManager.completeTxn)
+  }
+
+  private[log] def activePids: Map[Long, ProducerIdEntry] = lock synchronized {
+    producerStateManager.activeProducers
   }
 
   /**
@@ -426,47 +471,50 @@ class Log(@volatile var dir: File,
   def close() {
     debug(s"Closing log $name")
     lock synchronized {
+      producerStateManager.takeSnapshot()
       logSegments.foreach(_.close())
     }
   }
 
   /**
-    * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
-    * @param records The records to append
-    * @throws KafkaStorageException If the append fails due to an I/O error.
-    * @return Information about the appended messages including the first and last offset.
-    */
-  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int): LogAppendInfo = {
-    append(records, assignOffsets = true, leaderEpoch)
+   * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
+   * @param records The records to append
+   * @param isFromClient Whether or not this append is from a producer
+   * @throws KafkaStorageException If the append fails due to an I/O error.
+   * @return Information about the appended messages including the first and last offset.
+   */
+  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = {
+    append(records, isFromClient, assignOffsets = true, leaderEpoch)
   }
 
   /**
-    * Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs
-    * @param records The records to append
-    * @throws KafkaStorageException If the append fails due to an I/O error.
-    * @return Information about the appended messages including the first and last offset.
-    */
+   * Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs
+   * @param records The records to append
+   * @throws KafkaStorageException If the append fails due to an I/O error.
+   * @return Information about the appended messages including the first and last offset.
+   */
   def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
-    append(records, assignOffsets = false, leaderEpoch = -1)
+    append(records, isFromClient = false, assignOffsets = false, leaderEpoch = -1)
   }
 
   /**
-    * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
-    *
-    * This method will generally be responsible for assigning offsets to the messages,
-    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
-    *
-    * @param records The log records to append
-    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
-    * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
-    * @throws KafkaStorageException If the append fails due to an I/O error.
-    * @return Information about the appended messages including the first and last offset.
-    */
-  private def append(records: MemoryRecords, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = {
-    val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets)
+   * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
+   *
+   * This method will generally be responsible for assigning offsets to the messages,
+   * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
+   *
+   * @param records The log records to append
+   * @param isFromClient Whether or not this append is from a producer
+   * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
+   * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
+   * @throws KafkaStorageException If the append fails due to an I/O error.
+   * @return Information about the appended messages including the first and last offset.
+   */
+  private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = {
+    val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
 
     // return if we have no valid messages or if this is a duplicate of the last appended entry
-    if (appendInfo.shallowCount == 0 || appendInfo.isDuplicate)
+    if (appendInfo.shallowCount == 0)
       return appendInfo
 
     // trim any invalid bytes or partial messages before appending it to the on-disk log
@@ -483,15 +531,16 @@ class Log(@volatile var dir: File,
           val now = time.milliseconds
           val validateAndOffsetAssignResult = try {
             LogValidator.validateMessagesAndAssignOffsets(validRecords,
-                                                          offset,
-                                                          now,
-                                                          appendInfo.sourceCodec,
-                                                          appendInfo.targetCodec,
-                                                          config.compact,
-                                                          config.messageFormatVersion.messageFormatVersion,
-                                                          config.messageTimestampType,
-                                                          config.messageTimestampDifferenceMaxMs,
-                                                          leaderEpoch)
+              offset,
+              now,
+              appendInfo.sourceCodec,
+              appendInfo.targetCodec,
+              config.compact,
+              config.messageFormatVersion.messageFormatVersion,
+              config.messageTimestampType,
+              config.messageTimestampDifferenceMaxMs,
+              leaderEpoch,
+              isFromClient)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
@@ -534,34 +583,56 @@ class Log(@volatile var dir: File,
             .format(validRecords.sizeInBytes, config.segmentSize))
         }
 
+        // now that we have valid records, offsets assigned, and timestamps updated, we need to
+        // validate the idempotent/transactional state of the producers and collect some metadata
+        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)
+        maybeDuplicate.foreach { duplicate =>
+          appendInfo.firstOffset = duplicate.firstOffset
+          appendInfo.lastOffset = duplicate.lastOffset
+          appendInfo.logAppendTime = duplicate.timestamp
+          return appendInfo
+        }
+
         // maybe roll the log if this segment is full
         val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
           maxTimestampInMessages = appendInfo.maxTimestamp,
           maxOffsetInMessages = appendInfo.lastOffset)
 
-        // now append to the log
+        val logOffsetMetadata = LogOffsetMetadata(
+          messageOffset = appendInfo.firstOffset,
+          segmentBaseOffset = segment.baseOffset,
+          relativePositionInSegment = segment.size)
+
         segment.append(firstOffset = appendInfo.firstOffset,
           largestOffset = appendInfo.lastOffset,
           largestTimestamp = appendInfo.maxTimestamp,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
           records = validRecords)
 
-        // update the PID sequence mapping
-        for ((pid, producerAppendInfo) <- appendInfo.producerAppendInfos) {
-          trace(s"Updating pid with sequence: $pid -> ${producerAppendInfo.lastEntry}")
+        // update the producer state
+        for ((producerId, producerAppendInfo) <- updatedProducers) {
+          trace(s"Updating producer $producerId state: ${producerAppendInfo.lastEntry}")
+          producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+          producerStateManager.update(producerAppendInfo)
+        }
 
-          if (assignOffsets)
-            producerAppendInfo.assignLastOffsetAndTimestamp(appendInfo.lastOffset, appendInfo.maxTimestamp)
-          pidMap.update(producerAppendInfo)
+        // update the transaction index with the true last stable offset. The last offset visible
+        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
+        for (completedTxn <- completedTxns) {
+          val lastStableOffset = producerStateManager.completeTxn(completedTxn)
+          segment.updateTxnIndex(completedTxn, lastStableOffset)
         }
 
         // always update the last pid map offset so that the snapshot reflects the current offset
         // even if there isn't any idempotent data being written
-        pidMap.updateMapEndOffset(appendInfo.lastOffset + 1)
+        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
 
         // increment the log end offset
         updateLogEndOffset(appendInfo.lastOffset + 1)
 
+        // update the first unstable offset (which is used to compute LSO)
+        updateFirstUnstableOffset()
+
         trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
           .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
 
@@ -575,6 +646,24 @@ class Log(@volatile var dir: File,
     }
   }
 
+  def onHighWatermarkIncremented(highWatermark: Long): Unit = {
+    lock synchronized {
+      producerStateManager.onHighWatermarkUpdated(highWatermark)
+      updateFirstUnstableOffset()
+    }
+  }
+
+  private def updateFirstUnstableOffset(): Unit = lock synchronized {
+    this.firstUnstableOffset = producerStateManager.firstUnstableOffset match {
+      case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly =>
+        val offset = logOffsetMetadata.messageOffset
+        val segment = segments.floorEntry(offset).getValue
+        val position  = segment.translateOffset(offset)
+        Some(LogOffsetMetadata(offset, segment.baseOffset, position.position))
+      case other => other
+    }
+  }
+
   /**
    * Increment the log start offset if the provided offset is larger.
    */
@@ -589,6 +678,23 @@ class Log(@volatile var dir: File,
     }
   }
 
+  private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
+  (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[ProducerIdEntry]) = {
+    val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
+    val completedTxns = ListBuffer.empty[CompletedTxn]
+    for (batch <- records.batches.asScala if batch.hasProducerId) {
+      val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
+
+      // if this is a client produce request, there will be only one batch. If that batch matches
+      // the last appended entry for that producer, then this request is a duplicate and we return
+      // the last appended entry to the client.
+      if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch)))
+        return (updatedProducers, completedTxns.toList, maybeLastEntry)
+      updateProducers(batch, updatedProducers, completedTxns, maybeLastEntry, loadingFromLog = false)
+    }
+    (updatedProducers, completedTxns.toList, None)
+  }
+
   /**
    * Validate the following:
    * <ol>
@@ -616,8 +722,6 @@ class Log(@volatile var dir: File,
     var monotonic = true
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
-    var isDuplicate = false
-    val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()
 
     for (batch <- records.batches.asScala) {
       // we only validate V2 and higher to avoid potential compatibility issues with older clients
@@ -660,37 +764,23 @@ class Log(@volatile var dir: File,
       val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
       if (messageCodec != NoCompressionCodec)
         sourceCodec = messageCodec
-
-      val pid = batch.producerId
-      if (pid != RecordBatch.NO_PRODUCER_ID) {
-        producerAppendInfos.get(pid) match {
-          case Some(appendInfo) => appendInfo.append(batch)
-          case None =>
-            val lastEntry = pidMap.lastEntry(pid).getOrElse(ProducerIdEntry.Empty)
-            if (isFromClient && lastEntry.isDuplicate(batch)) {
-              // This request is a duplicate so return the information about the existing entry. Note that for requests
-              // coming from the client, there will only be one RecordBatch per request, so there will be only one iteration
-              // of the loop and the values below will not be updated more than once.
-              isDuplicate = true
-              firstOffset = lastEntry.firstOffset
-              lastOffset = lastEntry.lastOffset
-              maxTimestamp = lastEntry.timestamp
-              debug(s"Detected a duplicate for partition $topicPartition at (firstOffset, lastOffset): ($firstOffset, $lastOffset). " +
-                "Ignoring the incoming record.")
-            } else {
-              val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry)
-              producerAppendInfos.put(pid, producerAppendInfo)
-              producerAppendInfo.append(batch)
-            }
-        }
-      }
     }
 
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
-
     LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec,
-      targetCodec, shallowMessageCount, validBytesCount, monotonic, producerAppendInfos.toMap, isDuplicate)
+      targetCodec, shallowMessageCount, validBytesCount, monotonic)
+  }
+
+  private def updateProducers(batch: RecordBatch,
+                              producers: mutable.Map[Long, ProducerAppendInfo],
+                              completedTxns: ListBuffer[CompletedTxn],
+                              lastEntry: Option[ProducerIdEntry],
+                              loadingFromLog: Boolean): Unit = {
+    val pid = batch.producerId
+    val appendInfo = producers.getOrElseUpdate(pid, new ProducerAppendInfo(pid, lastEntry, loadingFromLog))
+    val maybeCompletedTxn = appendInfo.append(batch)
+    maybeCompletedTxn.foreach(completedTxns += _)
   }
 
   /**
@@ -721,11 +811,19 @@ class Log(@volatile var dir: File,
    * @param maxLength The maximum number of bytes to read
    * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
    * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
+   * @param isolationLevel The isolation level of the fetcher. The READ_UNCOMMITTED isolation level has the traditional
+   *                       read semantics (e.g. consumers are limited to fetching up to the high watermark). In
+   *                       READ_COMMITTED, consumers are limited to fetching up to the last stable offset. Additionally,
+   *                       in READ_COMMITTED, the transaction index is consulted after fetching to collect the list
+   *                       of aborted transactions in the fetch range which the consumer uses to filter the fetched
+   *                       records before they are returned to the user. Note that fetches from followers always use
+   *                       READ_UNCOMMITTED.
    *
    * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
    * @return The fetch data information including fetch starting offset metadata and messages read.
    */
-  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
+  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false,
+           isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): FetchDataInfo = {
     trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
 
     // Because we don't use lock for reading, the synchronization is a little bit tricky.
@@ -735,38 +833,43 @@ class Log(@volatile var dir: File,
     if(startOffset == next)
       return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)
 
-    var entry = segments.floorEntry(startOffset)
+    var segmentEntry = segments.floorEntry(startOffset)
 
     // return error on attempt to read beyond the log end offset or read below log start offset
-    if(startOffset > next || entry == null || startOffset < logStartOffset)
+    if(startOffset > next || segmentEntry == null || startOffset < logStartOffset)
       throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
 
     // Do the read on the segment with a base offset less than the target offset
     // but if that segment doesn't contain any messages with an offset greater than that
     // continue to read from successive segments until we get some messages or we reach the end of the log
-    while(entry != null) {
+    while(segmentEntry != null) {
+      val segment = segmentEntry.getValue
+
       // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
       // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
       // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
       // end of the active segment.
       val maxPosition = {
-        if (entry == segments.lastEntry) {
+        if (segmentEntry == segments.lastEntry) {
           val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
           // Check the segment again in case a new segment has just rolled out.
-          if (entry != segments.lastEntry)
+          if (segmentEntry != segments.lastEntry)
             // New log segment has rolled out, we can read up to the file end.
-            entry.getValue.size
+            segment.size
           else
             exposedPos
         } else {
-          entry.getValue.size
+          segment.size
         }
       }
-      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
-      if(fetchInfo == null) {
-        entry = segments.higherEntry(entry.getKey)
+      val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
+      if (fetchInfo == null) {
+        segmentEntry = segments.higherEntry(segmentEntry.getKey)
       } else {
-        return fetchInfo
+        return isolationLevel match {
+          case IsolationLevel.READ_UNCOMMITTED => fetchInfo
+          case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
+        }
       }
     }
 
@@ -776,6 +879,41 @@ class Log(@volatile var dir: File,
     FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
   }
 
+  private def addAbortedTransactions(startOffset: Long, segmentEntry: JEntry[JLong, LogSegment],
+                                     fetchInfo: FetchDataInfo): FetchDataInfo = {
+    val fetchSize = fetchInfo.records.sizeInBytes
+    val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+      fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
+    val upperBoundOffset = segmentEntry.getValue.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse {
+      val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
+      if (nextSegmentEntry != null)
+        nextSegmentEntry.getValue.baseOffset
+      else
+        logEndOffset
+    }
+    val abortedTransactions = collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry)
+    FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
+      records = fetchInfo.records,
+      firstEntryIncomplete = fetchInfo.firstEntryIncomplete,
+      abortedTransactions = Some(abortedTransactions))
+  }
+
+  private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
+                                         startingSegmentEntry: JEntry[JLong, LogSegment]): List[AbortedTransaction] = {
+    var segmentEntry = startingSegmentEntry
+    val abortedTransactions = ListBuffer.empty[AbortedTransaction]
+
+    while (segmentEntry != null) {
+      val searchResult = segmentEntry.getValue.collectAbortedTxns(startOffset, upperBoundOffset)
+      abortedTransactions ++= searchResult.abortedTransactions
+      if (searchResult.isComplete)
+        return abortedTransactions.toList
+
+      segmentEntry = segments.higherEntry(segmentEntry.getKey)
+    }
+    abortedTransactions.toList
+  }
+
   /**
    * Get an offset based on the given timestamp
    * The offset returned is the offset of the first message whose timestamp is greater than or equals to the
@@ -860,7 +998,8 @@ class Log(@volatile var dir: File,
         deletable.foreach(deleteSegment)
         logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
         leaderEpochCache.clearEarliest(logStartOffset)
-        pidMap.expirePids(logStartOffset)
+        producerStateManager.evictUnretainedProducers(logStartOffset)
+        updateFirstUnstableOffset()
       }
     }
     numToDelete
@@ -934,7 +1073,7 @@ class Log(@volatile var dir: File,
   def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
 
   /**
-   *  The offset of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
    */
   def logEndOffset: Long = nextOffsetMetadata.messageOffset
 
@@ -990,9 +1129,10 @@ class Log(@volatile var dir: File,
     lock synchronized {
       val newOffset = math.max(expectedNextOffset, logEndOffset)
       val logFile = logFilename(dir, newOffset)
-      val indexFile = indexFilename(dir, newOffset)
-      val timeIndexFile = timeIndexFilename(dir, newOffset)
-      for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
+      val offsetIdxFile = offsetIndexFile(dir, newOffset)
+      val timeIdxFile = timeIndexFile(dir, newOffset)
+      val txnIdxFile = transactionIndexFile(dir, newOffset)
+      for(file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
         warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
         file.delete()
       }
@@ -1007,6 +1147,15 @@ class Log(@volatile var dir: File,
           seg.log.trim()
         }
       }
+
+      // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
+      // offset align with the new segment offset since this ensures we can recover the segment by beginning
+      // with the corresponding snapshot file and scanning the segment data. Because the segment base offset
+      // may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
+      // we manually override the state offset here prior to taking the snapshot.
+      producerStateManager.updateMapEndOffset(newOffset)
+      producerStateManager.takeSnapshot()
+
       val segment = new LogSegment(dir,
                                    startOffset = newOffset,
                                    indexIntervalBytes = config.indexInterval,
@@ -1053,6 +1202,12 @@ class Log(@volatile var dir: File,
           time.milliseconds + " unflushed = " + unflushedMessages)
     for(segment <- logSegments(this.recoveryPoint, offset))
       segment.flush()
+
+    // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain
+    // the snapshots from the recent segments in case we need to truncate and rebuild the producer state.
+    // Otherwise, we would always need to rebuild from the earliest segment.
+    producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset))
+
     lock synchronized {
       if(offset > this.recoveryPoint) {
         this.recoveryPoint = offset
@@ -1061,6 +1216,17 @@ class Log(@volatile var dir: File,
     }
   }
 
+  def minSnapshotOffsetToRetain(flushedOffset: Long) = {
+    // always retain the producer snapshot from the last two segments. This solves the common case
+    // of truncating to an offset within the active segment, and the rarer case of truncating to the
+    // previous segment just after rolling the new segment.
+    var minSnapshotOffset = activeSegment.baseOffset
+    val previousSegment = segments.lowerEntry(activeSegment.baseOffset)
+    if (previousSegment != null)
+      minSnapshotOffset = previousSegment.getValue.baseOffset
+    math.min(flushedOffset, minSnapshotOffset)
+  }
+
   /**
    * Completely delete this log directory and all contents from the file system with no delay
    */
@@ -1073,11 +1239,25 @@ class Log(@volatile var dir: File,
     }
   }
 
-  private[log] def maybeTakePidSnapshot(): Unit = pidMap.maybeTakeSnapshot()
+  // visible for testing
+  private[log] def takeProducerSnapshot(): Unit = lock synchronized {
+    producerStateManager.takeSnapshot()
+  }
 
-  private[log] def latestPidSnapshotOffset: Option[Long] = pidMap.latestSnapshotOffset
+  // visible for testing
+  private[log] def latestProducerSnapshotOffset: Option[Long] = lock synchronized {
+    producerStateManager.latestSnapshotOffset
+  }
 
-  private[log] def latestPidMapOffset: Long = pidMap.mapEndOffset
+  // visible for testing
+  private[log] def oldestProducerSnapshotOffset: Option[Long] = lock synchronized {
+    producerStateManager.oldestSnapshotOffset
+  }
+
+  // visible for testing
+  private[log] def latestProducerStateEndOffset: Long = lock synchronized {
+    producerStateManager.mapEndOffset
+  }
 
   /**
    * Truncate this log so that it ends with the greatest offset < targetOffset.
@@ -1103,7 +1283,7 @@ class Log(@volatile var dir: File,
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
         this.logStartOffset = math.min(targetOffset, this.logStartOffset)
         leaderEpochCache.clearLatest(targetOffset)
-        buildAndRecoverPidMap(targetOffset)
+        loadProducerState(targetOffset)
       }
     }
   }
@@ -1130,8 +1310,9 @@ class Log(@volatile var dir: File,
       updateLogEndOffset(newOffset)
       leaderEpochCache.clear()
 
-      pidMap.truncate()
-      pidMap.updateMapEndOffset(newOffset)
+      producerStateManager.truncate()
+      producerStateManager.updateMapEndOffset(newOffset)
+      updateFirstUnstableOffset()
 
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
       this.logStartOffset = newOffset
@@ -1282,6 +1463,9 @@ object Log {
 
   val PidSnapshotFileSuffix = ".snapshot"
 
+  /** an (aborted) txn index */
+  val TxnIndexFileSuffix = ".txnindex"
+
   /** a file that is scheduled to be deleted */
   val DeletedFileSuffix = ".deleted"
 
@@ -1331,7 +1515,7 @@ object Log {
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
-  def indexFilename(dir: File, offset: Long) =
+  def offsetIndexFile(dir: File, offset: Long) =
     new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
 
   /**
@@ -1340,7 +1524,7 @@ object Log {
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
-  def timeIndexFilename(dir: File, offset: Long) =
+  def timeIndexFile(dir: File, offset: Long) =
     new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
 
   /**
@@ -1349,9 +1533,12 @@ object Log {
    * @param dir The directory in which the log will reside
    * @param offset The last offset (exclusive) included in the snapshot
    */
-  def pidSnapshotFilename(dir: File, offset: Long) =
+  def producerSnapshotFile(dir: File, offset: Long) =
     new File(dir, filenamePrefixFromOffset(offset) + PidSnapshotFileSuffix)
 
+  def transactionIndexFile(dir: File, offset: Long) =
+    new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix)
+
   def offsetFromFilename(filename: String): Long =
     filename.substring(0, filename.indexOf('.')).toLong
 
@@ -1387,4 +1574,12 @@ object Log {
     new TopicPartition(topic, partition.toInt)
   }
 
+  private def isIndexFile(file: File): Boolean = {
+    val filename = file.getName
+    filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix) || filename.endsWith(TxnIndexFileSuffix)
+  }
+
+  private def isLogFile(file: File): Boolean =
+    file.getPath.endsWith(LogFileSuffix)
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d0e8ec4..282e049 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -388,12 +388,18 @@ private[log] class Cleaner(val id: Int,
     logFile.delete()
     val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
     val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
+    val txnIndexFile = new File(segments.head.txnIndex.file.getPath + Log.CleanedFileSuffix)
     indexFile.delete()
     timeIndexFile.delete()
+    txnIndexFile.delete()
+
+    val startOffset = segments.head.baseOffset
     val records = FileRecords.open(logFile, false, log.initFileSize(), log.config.preallocate)
-    val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
-    val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize)
-    val cleaned = new LogSegment(records, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
+    val index = new OffsetIndex(indexFile, startOffset, segments.head.index.maxIndexSize)
+    val timeIndex = new TimeIndex(timeIndexFile, startOffset, segments.head.timeIndex.maxIndexSize)
+    val txnIndex = new TransactionIndex(startOffset, txnIndexFile)
+    val cleaned = new LogSegment(records, index, timeIndex, txnIndex, startOffset,
+      segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
 
     try {
       // clean segments into the new destination segment
@@ -451,7 +457,8 @@ private[log] class Cleaner(val id: Int,
                              activePids: Map[Long, ProducerIdEntry],
                              stats: CleanerStats) {
     val logCleanerFilter = new RecordFilter {
-      def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats, activePids, recordBatch.producerId)
+      def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean =
+        shouldRetainMessage(source, map, retainDeletes, recordBatch, record, stats, activePids)
     }
 
     var position = 0
@@ -492,17 +499,20 @@ private[log] class Cleaner(val id: Int,
   private def shouldRetainMessage(source: kafka.log.LogSegment,
                                   map: kafka.log.OffsetMap,
                                   retainDeletes: Boolean,
+                                  batch: RecordBatch,
                                   record: Record,
                                   stats: CleanerStats,
-                                  activePids: Map[Long, ProducerIdEntry],
-                                  pid: Long): Boolean = {
-    if (record.isControlRecord)
+                                  activeProducers: Map[Long, ProducerIdEntry]): Boolean = {
+    if (batch.isControlBatch)
       return true
 
     // retain the record if it is the last one produced by an active idempotent producer to ensure that
-    // the PID is not removed from the log before it has been expired
-    if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset))
-      return true
+    // the producerId is not removed from the log before it has been expired
+    if (batch.hasProducerId) {
+      val producerId = batch.producerId
+      if (RecordBatch.NO_PRODUCER_ID < producerId && activeProducers.get(producerId).exists(_.lastOffset == record.offset))
+        return true
+    }
 
     val pastLatestOffset = record.offset > map.latestOffset
     if (pastLatestOffset)
@@ -638,8 +648,8 @@ private[log] class Cleaner(val id: Int,
       throttler.maybeThrottle(records.sizeInBytes)
 
       val startPosition = position
-      for (record <- records.records.asScala) {
-        if (!record.isControlRecord && record.hasKey && record.offset >= start) {
+      for (batch <- records.batches.asScala; record <- batch.asScala) {
+        if (!batch.isControlBatch && record.hasKey && record.offset >= start) {
           if (map.size < maxDesiredMapSize)
             map.put(record.key, record.offset)
           else

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index b89fc40..c621680 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -189,7 +189,7 @@ class LogManager(val logDirs: Array[File],
         }
       }
 
-      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
+      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
     }
 
 
@@ -282,7 +282,6 @@ class LogManager(val logDirs: Array[File],
       jobs(dir) = jobsForDir.map(pool.submit).toSeq
     }
 
-
     try {
       for ((dir, dirJobs) <- jobs) {
         dirJobs.foreach(_.get)
@@ -312,7 +311,6 @@ class LogManager(val logDirs: Array[File],
     info("Shutdown complete.")
   }
 
-
   /**
    * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
    *
@@ -454,7 +452,7 @@ class LogManager(val logDirs: Array[File],
       case e: Throwable => 
         error(s"Exception in kafka-delete-logs thread.", e)
     }
-}
+  }
 
   /**
     * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index df3c372..d76b47a 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -27,14 +27,14 @@ import kafka.server.epoch.LeaderEpochCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.record.FileRecords.LogEntryPosition
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.math._
 
- /**
+/**
  * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
  * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
  * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
@@ -53,6 +53,7 @@ import scala.math._
 class LogSegment(val log: FileRecords,
                  val index: OffsetIndex,
                  val timeIndex: TimeIndex,
+                 val txnIndex: TransactionIndex,
                  val baseOffset: Long,
                  val indexIntervalBytes: Int,
                  val rollJitterMs: Long,
@@ -67,45 +68,49 @@ class LogSegment(val log: FileRecords,
   private var rollingBasedTimestamp: Option[Long] = None
 
   /* The maximum timestamp we see so far */
-  @volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp
-  @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset
+  @volatile private var maxTimestampSoFar: Long = timeIndex.lastEntry.timestamp
+  @volatile private var offsetOfMaxTimestamp: Long = timeIndex.lastEntry.offset
 
-  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
+  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time,
+           fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
     this(FileRecords.open(Log.logFilename(dir, startOffset), fileAlreadyExists, initFileSize, preallocate),
-         new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
-         new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+         new OffsetIndex(Log.offsetIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+         new TimeIndex(Log.timeIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+         new TransactionIndex(startOffset, Log.transactionIndexFile(dir, startOffset)),
          startOffset,
          indexIntervalBytes,
          rollJitterMs,
          time)
 
   /* Return the size in bytes of this log segment */
-  def size: Long = log.sizeInBytes()
+  def size: Int = log.sizeInBytes()
 
-   /**
-     * checks that the argument offset can be represented as an integer offset relative to the baseOffset.
-     */
-   def canConvertToRelativeOffset(offset: Long): Boolean = {
-     (offset - baseOffset) <= Integer.MAX_VALUE
-   }
+  /**
+   * checks that the argument offset can be represented as an integer offset relative to the baseOffset.
+   */
+  def canConvertToRelativeOffset(offset: Long): Boolean = {
+    (offset - baseOffset) <= Integer.MAX_VALUE
+  }
 
-   /**
+  /**
    * Append the given messages starting with the given offset. Add
    * an entry to the index if needed.
    *
    * It is assumed this method is being called from within a lock.
    *
    * @param firstOffset The first offset in the message set.
+   * @param largestOffset The last offset in the message set
    * @param largestTimestamp The largest timestamp in the message set.
    * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
    * @param records The log entries to append.
+   * @return the physical position in the file of the appended records
    */
   @nonthreadsafe
   def append(firstOffset: Long,
              largestOffset: Long,
              largestTimestamp: Long,
              shallowOffsetOfMaxTimestamp: Long,
-             records: MemoryRecords) {
+             records: MemoryRecords): Unit = {
     if (records.sizeInBytes > 0) {
       trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
           .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
@@ -131,6 +136,28 @@ class LogSegment(val log: FileRecords,
     }
   }
 
+  @nonthreadsafe
+  def updateTxnIndex(completedTxn: CompletedTxn, lastStableOffset: Long) {
+    if (completedTxn.isAborted) {
+      trace(s"Writing aborted transaction $completedTxn to transaction index, last stable offset is $lastStableOffset")
+      txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset))
+    }
+  }
+
+  private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
+    if (batch.hasProducerId) {
+      val producerId = batch.producerId
+      val lastEntry = producerStateManager.lastEntry(producerId)
+      val appendInfo = new ProducerAppendInfo(batch.producerId, lastEntry, loadingFromLog = true)
+      val maybeCompletedTxn = appendInfo.append(batch)
+      producerStateManager.update(appendInfo)
+      maybeCompletedTxn.foreach { completedTxn =>
+        val lastStableOffset = producerStateManager.completeTxn(completedTxn)
+        updateTxnIndex(completedTxn, lastStableOffset)
+      }
+    }
+  }
+
   /**
    * Find the physical file position for the first message with offset >= the requested offset.
    *
@@ -144,7 +171,7 @@ class LogSegment(val log: FileRecords,
     *        message or null if no message meets this criteria.
    */
   @threadsafe
-  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
+  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
     val mapping = index.lookup(offset)
     log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
   }
@@ -175,7 +202,7 @@ class LogSegment(val log: FileRecords,
     if (startOffsetAndSize == null)
       return null
 
-    val startPosition = startOffsetAndSize.position.toInt
+    val startPosition = startOffsetAndSize.position
     val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
 
     val adjustedMaxSize =
@@ -187,7 +214,7 @@ class LogSegment(val log: FileRecords,
       return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
 
     // calculate the length of the message set to read based on whether or not they gave us a maxOffset
-    val length = maxOffset match {
+    val fetchSize: Int = maxOffset match {
       case None =>
         // no max offset, just read until the max position
         min((maxPosition - startPosition).toInt, adjustedMaxSize)
@@ -207,24 +234,32 @@ class LogSegment(val log: FileRecords,
         min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
     }
 
-    FetchDataInfo(offsetMetadata, log.read(startPosition, length),
+    FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize),
       firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
   }
 
+   def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Option[Long] =
+     index.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)
+
   /**
    * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
    *
    * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
    * is corrupt.
-    * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
+   * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
+   *                             the transaction index.
+   * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
    * @return The number of bytes truncated from the log
    */
   @nonthreadsafe
-  def recover(maxMessageSize: Int, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
+  def recover(maxMessageSize: Int,
+              producerStateManager: ProducerStateManager,
+              leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
     index.truncate()
     index.resize(index.maxIndexSize)
     timeIndex.truncate()
     timeIndex.resize(timeIndex.maxIndexSize)
+    txnIndex.truncate()
     var validBytes = 0
     var lastIndexEntry = 0
     maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
@@ -250,8 +285,9 @@ class LogSegment(val log: FileRecords,
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
           leaderEpochCache.foreach { cache =>
             if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign()
-              cache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
+              cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
           }
+          updateProducerState(producerStateManager, batch)
         }
       }
     } catch {
@@ -268,22 +304,23 @@ class LogSegment(val log: FileRecords,
     truncated
   }
 
-  def loadLargestTimestamp(readToLogEnd: Boolean = false) {
+  private def loadLargestTimestamp() {
     // Get the last time index entry. If the time index is empty, it will return (-1, baseOffset)
     val lastTimeIndexEntry = timeIndex.lastEntry
     maxTimestampSoFar = lastTimeIndexEntry.timestamp
     offsetOfMaxTimestamp = lastTimeIndexEntry.offset
-    if (readToLogEnd) {
-      val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
-      // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
-      val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
-      if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
-        maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp
-        offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset
-      }
+
+    val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
+    // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
+    val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
+    if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
+      maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp
+      offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset
     }
   }
 
+  def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult =
+    txnIndex.collectAbortedTxns(fetchOffset, upperBoundOffset)
 
   override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
 
@@ -301,6 +338,7 @@ class LogSegment(val log: FileRecords,
       return 0
     index.truncateTo(offset)
     timeIndex.truncateTo(offset)
+    txnIndex.truncateTo(offset)
     // after truncation, reset and allocate more space for the (new currently  active) index
     index.resize(index.maxIndexSize)
     timeIndex.resize(timeIndex.maxIndexSize)
@@ -310,9 +348,8 @@ class LogSegment(val log: FileRecords,
       rollingBasedTimestamp = None
     }
     bytesSinceLastIndexEntry = 0
-    // We may need to reload the max timestamp after truncation.
     if (maxTimestampSoFar >= 0)
-      loadLargestTimestamp(readToLogEnd = true)
+      loadLargestTimestamp()
     bytesTruncated
   }
 
@@ -323,14 +360,12 @@ class LogSegment(val log: FileRecords,
   @threadsafe
   def nextOffset(): Long = {
     val ms = read(index.lastOffset, None, log.sizeInBytes)
-    if (ms == null) {
+    if (ms == null)
       baseOffset
-    } else {
-      ms.records.batches.asScala.lastOption match {
-        case None => baseOffset
-        case Some(last) => last.nextOffset
-      }
-    }
+    else
+      ms.records.batches.asScala.lastOption
+        .map(_.nextOffset)
+        .getOrElse(baseOffset)
   }
 
   /**
@@ -342,6 +377,7 @@ class LogSegment(val log: FileRecords,
       log.flush()
       index.flush()
       timeIndex.flush()
+      txnIndex.flush()
     }
   }
 
@@ -365,6 +401,10 @@ class LogSegment(val log: FileRecords,
     catch {
       case e: IOException => throw kafkaStorageException("timeindex", e)
     }
+    try txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
+    catch {
+      case e: IOException => throw kafkaStorageException("txnindex", e)
+    }
   }
 
   /**
@@ -437,6 +477,7 @@ class LogSegment(val log: FileRecords,
     CoreUtils.swallow(index.close())
     CoreUtils.swallow(timeIndex.close())
     CoreUtils.swallow(log.close())
+    CoreUtils.swallow(txnIndex.close())
   }
 
   /**
@@ -448,12 +489,15 @@ class LogSegment(val log: FileRecords,
     val deletedLog = log.delete()
     val deletedIndex = index.delete()
     val deletedTimeIndex = timeIndex.delete()
-    if(!deletedLog && log.file.exists)
+    val deletedTxnIndex = txnIndex.delete()
+    if (!deletedLog && log.file.exists)
       throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
-    if(!deletedIndex && index.file.exists)
+    if (!deletedIndex && index.file.exists)
       throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
-    if(!deletedTimeIndex && timeIndex.file.exists)
+    if (!deletedTimeIndex && timeIndex.file.exists)
       throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
+    if (!deletedTxnIndex && txnIndex.file.exists)
+      throw new KafkaStorageException("Delete of transaction index " + txnIndex.file.getName + " failed.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index c1777d5..33257fd 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -52,24 +52,32 @@ private[kafka] object LogValidator extends Logging {
                                                       magic: Byte,
                                                       timestampType: TimestampType,
                                                       timestampDiffMaxMs: Long,
-                                                      partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+                                                      partitionLeaderEpoch: Int,
+                                                      isFromClient: Boolean): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
       if (!records.hasMatchingMagic(magic))
         convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType,
-          timestampDiffMaxMs, magic, partitionLeaderEpoch)
+          timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient)
       else
         // Do in-place validation, offset assignment and maybe set timestamp
         assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
-          partitionLeaderEpoch)
+          partitionLeaderEpoch, isFromClient)
     } else {
       validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
-        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch)
+        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient)
     }
   }
 
-  private def validateBatch(batch: RecordBatch): Unit = {
-    ensureNonTransactional(batch)
+  private def validateBatch(batch: RecordBatch, isFromClient: Boolean): Unit = {
+    if (isFromClient) {
+      if (batch.hasProducerId && batch.baseSequence < 0)
+        throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " +
+          s"with producerId ${batch.producerId}")
+
+      if (batch.isControlBatch)
+        throw new InvalidRecordException("Clients are not allowed to write control records")
+    }
   }
 
   private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType,
@@ -84,7 +92,6 @@ private[kafka] object LogValidator extends Logging {
     if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed)
       record.ensureValid()
 
-    ensureNotControlRecord(record)
     validateKey(record, compactedTopic)
     validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs)
   }
@@ -96,21 +103,22 @@ private[kafka] object LogValidator extends Logging {
                                                    timestampType: TimestampType,
                                                    timestampDiffMaxMs: Long,
                                                    toMagicValue: Byte,
-                                                   partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+                                                   partitionLeaderEpoch: Int,
+                                                   isFromClient: Boolean): ValidationAndOffsetAssignResult = {
     val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
       CompressionType.NONE, records.records)
 
-    val (pid, epoch, sequence) = {
+    val (producerId, producerEpoch, sequence, isTransactional) = {
       val first = records.batches.asScala.head
-      (first.producerId, first.producerEpoch, first.baseSequence)
+      (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
     }
 
     val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
     val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
-      offsetCounter.value, now, pid, epoch, sequence, false, partitionLeaderEpoch)
+      offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
 
     for (batch <- records.batches.asScala) {
-      validateBatch(batch)
+      validateBatch(batch, isFromClient)
 
       for (record <- batch.asScala) {
         validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
@@ -133,21 +141,21 @@ private[kafka] object LogValidator extends Logging {
                                          compactedTopic: Boolean,
                                          timestampType: TimestampType,
                                          timestampDiffMaxMs: Long,
-                                         partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+                                         partitionLeaderEpoch: Int,
+                                         isFromClient: Boolean): ValidationAndOffsetAssignResult = {
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
     val initialOffset = offsetCounter.value
     var isMagicV2 = false
 
     for (batch <- records.batches.asScala) {
-      validateBatch(batch)
+      validateBatch(batch, isFromClient)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
       var offsetOfMaxBatchTimestamp = -1L
 
       for (record <- batch.asScala) {
         validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
-
         val offset = offsetCounter.getAndIncrement()
         if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
           maxBatchTimestamp = record.timestamp
@@ -206,7 +214,8 @@ private[kafka] object LogValidator extends Logging {
                                                  magic: Byte,
                                                  timestampType: TimestampType,
                                                  timestampDiffMaxMs: Long,
-                                                 partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+                                                 partitionLeaderEpoch: Int,
+                                                 isFromClient: Boolean): ValidationAndOffsetAssignResult = {
 
       // No in place assignment situation 1 and 2
       var inPlaceAssignment = sourceCodec == targetCodec && magic > RecordBatch.MAGIC_VALUE_V0
@@ -216,14 +225,17 @@ private[kafka] object LogValidator extends Logging {
       val validatedRecords = new mutable.ArrayBuffer[Record]
 
       for (batch <- records.batches.asScala) {
-        validateBatch(batch)
+        validateBatch(batch, isFromClient)
+
+        // Do not compress control records unless they are written compressed
+        if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
+          inPlaceAssignment = true
 
         for (record <- batch.asScala) {
           validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
           if (sourceCodec != NoCompressionCodec && record.isCompressed)
             throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
               s"compression attribute set: $record")
-
           if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && magic > RecordBatch.MAGIC_VALUE_V0) {
             // Check if we need to overwrite offset
             // No in place assignment situation 3
@@ -242,15 +254,15 @@ private[kafka] object LogValidator extends Logging {
       }
 
       if (!inPlaceAssignment) {
-        val (pid, epoch, sequence) = {
+        val (producerId, producerEpoch, sequence, isTransactional) = {
           // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
           // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
           // with older magic versions, there will never be a producer id, etc.
           val first = records.batches.asScala.head
-          (first.producerId, first.producerEpoch, first.baseSequence)
+          (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
         }
         buildRecordsAndAssignOffsets(magic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now,
-          validatedRecords, pid, epoch, sequence, partitionLeaderEpoch)
+          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
       } else {
         // we can update the batch only and write the compressed payload as is
         val batch = records.batches.iterator.next()
@@ -274,14 +286,22 @@ private[kafka] object LogValidator extends Logging {
       }
   }
 
-  private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType,
-                                           compressionType: CompressionType, logAppendTime: Long,
+  private def buildRecordsAndAssignOffsets(magic: Byte,
+                                           offsetCounter: LongRef,
+                                           timestampType: TimestampType,
+                                           compressionType: CompressionType,
+                                           logAppendTime: Long,
                                            validatedRecords: Seq[Record],
-                                           producerId: Long, epoch: Short, baseSequence: Int, partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
-    val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava)
+                                           producerId: Long,
+                                           producerEpoch: Short,
+                                           baseSequence: Int,
+                                           isTransactional: Boolean,
+                                           partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+    val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
+      validatedRecords.asJava)
     val buffer = ByteBuffer.allocate(estimatedSize)
     val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
-      logAppendTime, producerId, epoch, baseSequence, false, partitionLeaderEpoch)
+      logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch)
 
     validatedRecords.foreach { record =>
       builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
@@ -297,17 +317,6 @@ private[kafka] object LogValidator extends Logging {
       messageSizeMaybeChanged = true)
   }
 
-  private def ensureNonTransactional(batch: RecordBatch) {
-    if (batch.isTransactional)
-      throw new InvalidRecordException("Transactional messages are not currently supported")
-  }
-
-  private def ensureNotControlRecord(record: Record) {
-    // Until we have implemented transaction support, we do not permit control records to be written
-    if (record.isControlRecord)
-      throw new InvalidRecordException("Control messages are not currently supported")
-  }
-
   private def validateKey(record: Record, compactedTopic: Boolean) {
     if (compactedTopic && !record.hasKey)
       throw new InvalidRecordException("Compacted topic cannot accept message without key.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index a54579f..e4939e8 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -85,7 +85,7 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable
   def lookup(targetOffset: Long): OffsetPosition = {
     maybeLock(lock) {
       val idx = mmap.duplicate
-      val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)
+      val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
       if(slot == -1)
         OffsetPosition(baseOffset, 0)
       else
@@ -93,6 +93,22 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable
     }
   }
 
+  /**
+   * Find an upper bound offset for the given fetch starting position and size. This is an offset which
+   * is guaranteed to be outside the fetched range, but note that it will not generally be the smallest
+   * such offset.
+   */
+  def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = {
+    maybeLock(lock) {
+      val idx = mmap.duplicate
+      val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE)
+      if (slot == -1)
+        None
+      else
+        Some(parseEntry(idx, slot).asInstanceOf[OffsetPosition])
+    }
+  }
+
   private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
 
   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)
@@ -140,7 +156,7 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable
   override def truncateTo(offset: Long) {
     inLock(lock) {
       val idx = mmap.duplicate
-      val slot = indexSlotFor(idx, offset, IndexSearchType.KEY)
+      val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY)
 
       /* There are 3 cases for choosing the new size
        * 1) if there is no entry in the index <= the offset, delete everything