You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/03/28 16:59:50 UTC

[2/3] kafka git commit: KAFKA-4586; Add purgeDataBefore() API (KIP-107)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c2d34d9..ddb2411 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,7 +28,7 @@ import kafka.controller.KafkaController
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException}
+import org.apache.kafka.common.errors.{PolicyViolationException, NotEnoughReplicasException, NotLeaderForPartitionException}
 import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.JavaConverters._
@@ -235,10 +235,20 @@ class Partition(val topic: String,
   def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) {
     getReplica(replicaId) match {
       case Some(replica) =>
+        // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
+        val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
         replica.updateLogReadResult(logReadResult)
+        val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
+        // check if the LW of the partition has incremented
+        // since the replica's logStartOffset may have incremented
+        val leaderLWIncremented = newLeaderLW > oldLeaderLW
         // check if we need to expand ISR to include this replica
         // if it is not in the ISR yet
-        maybeExpandIsr(replicaId, logReadResult)
+        val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)
+
+        // some delayed operations may be unblocked after HW or LW changed
+        if (leaderLWIncremented || leaderHWIncremented)
+          tryCompleteDelayedRequests()
 
         debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
           .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition))
@@ -263,8 +273,8 @@ class Partition(val topic: String,
    *
    * This function can be triggered when a replica's LEO has incremented
    */
-  def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) {
-    val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
+  def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
+    inWriteLock(leaderIsrUpdateLock) {
       // check if this replica needs to be added to the ISR
       leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
@@ -280,18 +290,12 @@ class Partition(val topic: String,
             updateIsr(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
           }
-
           // check if the HW of the partition can now be incremented
           // since the replica may already be in the ISR and its LEO has just incremented
           maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)
-
         case None => false // nothing to do if no longer leader
       }
     }
-
-    // some delayed operations may be unblocked after HW changed
-    if (leaderHWIncremented)
-      tryCompleteDelayedRequests()
   }
 
   /*
@@ -376,12 +380,25 @@ class Partition(val topic: String,
   }
 
   /**
+   * The low watermark offset value, calculated only if the local replica is the partition leader
+   * It is only used by leader broker to decide when DeleteRecordsRequest is satisfied. Its value is minimum logStartOffset of all live replicas
+   * Low watermark will increase when the leader broker receives either FetchRequest or DeleteRecordsRequest.
+   */
+  def lowWatermarkIfLeader: Long = {
+    if (!isLeaderReplicaLocal)
+      throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))
+    assignedReplicas.filter(replica =>
+      replicaManager.metadataCache.isBrokerAlive(replica.brokerId)).map(_.logStartOffset).reduceOption(_ min _).getOrElse(0L)
+  }
+
+  /**
    * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
    */
   private def tryCompleteDelayedRequests() {
     val requestKey = new TopicPartitionOperationKey(topicPartition)
     replicaManager.tryCompleteDelayedFetch(requestKey)
     replicaManager.tryCompleteDelayedProduce(requestKey)
+    replicaManager.tryCompleteDelayedDeleteRecords(requestKey)
   }
 
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
@@ -467,6 +484,27 @@ class Partition(val topic: String,
     info
   }
 
+  /**
+   * Update logStartOffset and low watermark if 1) offset <= highWatermark and 2) it is the leader replica.
+   * This function can trigger log segment deletion and log rolling.
+   *
+   * Return low watermark of the partition.
+   */
+  def deleteRecordsOnLeader(offset: Long): Long = {
+    inReadLock(leaderIsrUpdateLock) {
+      leaderReplicaIfLocal match {
+        case Some(leaderReplica) =>
+          leaderReplica.maybeIncrementLogStartOffset(offset)
+          if (!leaderReplica.log.get.config.delete)
+            throw new PolicyViolationException("Records of partition %s can not be deleted due to the configured policy".format(topicPartition))
+          lowWatermarkIfLeader
+        case None =>
+          throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+            .format(topicPartition, localBrokerId))
+      }
+    }
+  }
+
   private def updateIsr(newIsr: Set[Replica]) {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 8597b06..3995f9e 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -21,7 +21,8 @@ import kafka.log.Log
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import kafka.common.KafkaException
-import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+
 
 import org.apache.kafka.common.utils.Time
 
@@ -35,6 +36,9 @@ class Replica(val brokerId: Int,
   // the log end offset value, kept in all replicas;
   // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
   @volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
+  // the log start offset value, kept in all replicas;
+  // for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch
+  @volatile private[this] var _logStartOffset = Log.UnknownLogStartOffset
 
   // The log end offset value at the time the leader received the last FetchRequest from this follower
   // This is used to determine the lastCaughtUpTimeMs of the follower
@@ -72,6 +76,7 @@ class Replica(val brokerId: Int,
     else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
       _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
 
+    logStartOffset = logReadResult.followerLogStartOffset
     logEndOffset = logReadResult.info.fetchOffsetMetadata
     lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
     lastFetchTimeMs = logReadResult.fetchTimeMs
@@ -98,6 +103,33 @@ class Replica(val brokerId: Int,
     else
       logEndOffsetMetadata
 
+  def maybeIncrementLogStartOffset(offset: Long) {
+    if (isLocal) {
+      if (highWatermark.messageOffset < offset)
+        throw new OffsetOutOfRangeException(s"The specified offset $offset is higher than the high watermark" +
+                                            s" ${highWatermark.messageOffset} of the partition $topicPartition")
+      log.get.maybeIncrementLogStartOffset(offset)
+    } else {
+      throw new KafkaException(s"Should not try to delete records on partition $topicPartition's non-local replica $brokerId")
+    }
+  }
+
+  private def logStartOffset_=(newLogStartOffset: Long) {
+    if (isLocal) {
+      throw new KafkaException(s"Should not set log start offset on partition $topicPartition's local replica $brokerId " +
+                               s"without attempting to delete records of the log")
+    } else {
+      _logStartOffset = newLogStartOffset
+      trace(s"Setting log start offset for remote replica $brokerId for partition $topicPartition to [$newLogStartOffset]")
+    }
+  }
+
+  def logStartOffset =
+    if (isLocal)
+      log.get.logStartOffset
+    else
+      _logStartOffset
+
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
     if (isLocal) {
       highWatermarkMetadata = newHighWatermark

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 75b1f24..c4b7ce6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -101,8 +101,7 @@ class ConsumerFetcherThread(name: String,
   protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
     partitionMap.foreach { case ((topicPartition, partitionFetchState)) =>
       if (partitionFetchState.isActive)
-        fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.offset,
-          fetchSize)
+        fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.fetchOffset, fetchSize)
     }
 
     new FetchRequest(fetchRequestBuilder.build())

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d2cac23..96535b1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -79,6 +79,17 @@ case class LogAppendInfo(var firstOffset: Long,
  *
  * @param dir The directory in which log segments are created.
  * @param config The log configuration settings
+ * @param logStartOffset The earliest offset allowed to be exposed to kafka client.
+ *                       The logStartOffset can be updated by :
+ *                       - user's DeleteRecordsRequest
+ *                       - broker's log retention
+ *                       - broker's log truncation
+ *                       The logStartOffset is used to decide the following:
+ *                       - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
+ *                         It may trigger log rolling if the active segment is deleted.
+ *                       - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
+ *                         we make sure that logStartOffset <= log's highWatermark
+ *                       Other activities such as log cleaning are not affected by logStartOffset.
  * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
  * @param scheduler The thread pool scheduler used for background actions
  * @param time The time instance used for checking the clock
@@ -87,6 +98,7 @@ case class LogAppendInfo(var firstOffset: Long,
 @threadsafe
 class Log(@volatile var dir: File,
           @volatile var config: LogConfig,
+          @volatile var logStartOffset: Long = 0L,
           @volatile var recoveryPoint: Long = 0L,
           scheduler: Scheduler,
           time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
@@ -118,8 +130,10 @@ class Log(@volatile var dir: File,
     nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
       activeSegment.size.toInt)
 
-    info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
-      .format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
+    logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+
+    info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
+      .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
   }
 
   val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
@@ -443,6 +457,20 @@ class Log(@volatile var dir: File,
     }
   }
 
+  /*
+   * Increment the log start offset if the provided offset is larger.
+   */
+  def maybeIncrementLogStartOffset(offset: Long) {
+    // We don't have to write the log start offset to log-start-offset-checkpoint immediately.
+    // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
+    // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
+    lock synchronized {
+      if (offset > logStartOffset) {
+        logStartOffset = offset
+      }
+    }
+  }
+
   /**
    * Validate the following:
    * <ol>
@@ -543,7 +571,7 @@ class Log(@volatile var dir: File,
    * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
    * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
    *
-   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
+   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
    * @return The fetch data information including fetch starting offset metadata and messages read.
    */
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
@@ -558,9 +586,9 @@ class Log(@volatile var dir: File,
 
     var entry = segments.floorEntry(startOffset)
 
-    // attempt to read beyond the log end offset is an error
-    if(startOffset > next || entry == null)
-      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
+    // return error on attempt to read beyond the log end offset or read below log start offset
+    if(startOffset > next || entry == null || startOffset < logStartOffset)
+      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
 
     // Do the read on the segment with a base offset less than the target offset
     // but if that segment doesn't contain any messages with an offset greater than that
@@ -626,7 +654,7 @@ class Log(@volatile var dir: File,
     val segmentsCopy = logSegments.toBuffer
     // For the earliest and latest, we do not need to return the timestamp.
     if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
+        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
     else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
         return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
 
@@ -640,7 +668,7 @@ class Log(@volatile var dir: File,
         None
     }
 
-    targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
+    targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
   }
 
   /**
@@ -666,16 +694,21 @@ class Log(@volatile var dir: File,
   private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
     lock synchronized {
       val deletable = deletableSegments(predicate)
-      val numToDelete = deletable.size
-      if (numToDelete > 0) {
-        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
-        if (segments.size == numToDelete)
-          roll()
-        // remove the segments for lookups
-        deletable.foreach(deleteSegment)
-      }
-      numToDelete
+      deleteSegments(deletable)
+    }
+  }
+
+  private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
+    val numToDelete = deletable.size
+    if (numToDelete > 0) {
+      // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
+      if (segments.size == numToDelete)
+        roll()
+      // remove the segments for lookups
+      deletable.foreach(deleteSegment)
+      logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
     }
+    numToDelete
   }
 
   /**
@@ -696,10 +729,10 @@ class Log(@volatile var dir: File,
     */
   def deleteOldSegments(): Int = {
     if (!config.delete) return 0
-    deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
+    deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
   }
 
-  private def deleteRetenionMsBreachedSegments() : Int = {
+  private def deleteRetentionMsBreachedSegments() : Int = {
     if (config.retentionMs < 0) return 0
     val startMs = time.milliseconds
     deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
@@ -719,16 +752,27 @@ class Log(@volatile var dir: File,
     deleteOldSegments(shouldDelete)
   }
 
+  private def deleteLogStartOffsetBreachedSegments() : Int = {
+    // keep active segment to avoid frequent log rolling due to user's DeleteRecordsRequest
+    lock synchronized {
+      val deletable = {
+        if (segments.size() < 2)
+          Seq.empty
+        else
+          logSegments.sliding(2).takeWhile { iterable =>
+            val nextSegment = iterable.toSeq(1)
+            nextSegment.baseOffset <= logStartOffset
+          }.map(_.toSeq(0)).toSeq
+      }
+      deleteSegments(deletable)
+    }
+  }
+
   /**
    * The size of the log in bytes
    */
   def size: Long = logSegments.map(_.size).sum
 
-   /**
-   * The earliest message offset in the log
-   */
-  def logStartOffset: Long = logSegments.head.baseOffset
-
   /**
    * The offset metadata of the next message that will be appended to the log
    */
@@ -789,7 +833,7 @@ class Log(@volatile var dir: File,
   def roll(expectedNextOffset: Long = 0): LogSegment = {
     val start = time.nanoseconds
     lock synchronized {
-      val newOffset = Math.max(expectedNextOffset, logEndOffset)
+      val newOffset = math.max(expectedNextOffset, logEndOffset)
       val logFile = logFilename(dir, newOffset)
       val indexFile = indexFilename(dir, newOffset)
       val timeIndexFile = timeIndexFilename(dir, newOffset)
@@ -895,6 +939,7 @@ class Log(@volatile var dir: File,
         activeSegment.truncateTo(targetOffset)
         updateLogEndOffset(targetOffset)
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
+        this.logStartOffset = math.min(targetOffset, this.logStartOffset)
       }
     }
   }
@@ -920,6 +965,7 @@ class Log(@volatile var dir: File,
                                 preallocate = config.preallocate))
       updateLogEndOffset(newOffset)
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
+      this.logStartOffset = newOffset
     }
   }
 
@@ -1082,6 +1128,8 @@ object Log {
   /** a directory that is scheduled to be deleted */
   val DeleteDirSuffix = "-delete"
 
+  val UnknownLogStartOffset = -1L
+
   /**
    * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
    * so that ls sorts the files numerically.

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 761edf9..a555420 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -48,12 +48,14 @@ class LogManager(val logDirs: Array[File],
                  val cleanerConfig: CleanerConfig,
                  ioThreads: Int,
                  val flushCheckMs: Long,
-                 val flushCheckpointMs: Long,
+                 val flushRecoveryOffsetCheckpointMs: Long,
+                 val flushStartOffsetCheckpointMs: Long,
                  val retentionCheckMs: Long,
                  scheduler: Scheduler,
                  val brokerState: BrokerState,
                  time: Time) extends Logging {
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
+  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
   val LockFile = ".lock"
   val InitialTaskDelayMs = 30*1000
 
@@ -64,6 +66,7 @@ class LogManager(val logDirs: Array[File],
   createAndValidateLogDirs(logDirs)
   private val dirLocks = lockLogDirs(logDirs)
   private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
+  private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, LogStartOffsetCheckpointFile)))).toMap
   loadLogs()
 
   // public, so we can access this from kafka.admin.DeleteTopicTest
@@ -139,10 +142,18 @@ class LogManager(val logDirs: Array[File],
         recoveryPoints = this.recoveryPointCheckpoints(dir).read
       } catch {
         case e: Exception =>
-          warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
+          warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e)
           warn("Resetting the recovery checkpoint to 0")
       }
 
+      var logStartOffsets = Map[TopicPartition, Long]()
+      try {
+        logStartOffsets = this.logStartOffsetCheckpoints(dir).read
+      } catch {
+        case e: Exception =>
+          warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e)
+      }
+
       val jobsForDir = for {
         dirContent <- Option(dir.listFiles).toList
         logDir <- dirContent if logDir.isDirectory
@@ -153,8 +164,9 @@ class LogManager(val logDirs: Array[File],
           val topicPartition = Log.parseTopicPartitionName(logDir)
           val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
           val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
+          val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
 
-          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
+          val current = new Log(logDir, config, logStartOffset, logRecoveryPoint, scheduler, time)
           if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
             this.logsToBeDeleted.add(current)
           } else {
@@ -210,7 +222,12 @@ class LogManager(val logDirs: Array[File],
       scheduler.schedule("kafka-recovery-point-checkpoint",
                          checkpointRecoveryPointOffsets,
                          delay = InitialTaskDelayMs,
-                         period = flushCheckpointMs,
+                         period = flushRecoveryOffsetCheckpointMs,
+                         TimeUnit.MILLISECONDS)
+      scheduler.schedule("kafka-log-start-offset-checkpoint",
+                         checkpointLogStartOffsets,
+                         delay = InitialTaskDelayMs,
+                         period = flushStartOffsetCheckpointMs,
                          TimeUnit.MILLISECONDS)
       scheduler.schedule("kafka-delete-logs",
                          deleteLogs,
@@ -263,7 +280,10 @@ class LogManager(val logDirs: Array[File],
 
         // update the last flush point
         debug("Updating recovery points at " + dir)
-        checkpointLogsInDir(dir)
+        checkpointLogRecoveryOffsetsInDir(dir)
+
+        debug("Updating log start offsets at " + dir)
+        checkpointLogStartOffsetsInDir(dir)
 
         // mark that the shutdown was clean by creating marker file
         debug("Writing clean shutdown marker at " + dir)
@@ -333,13 +353,21 @@ class LogManager(val logDirs: Array[File],
    * to avoid recovering the whole log on startup.
    */
   def checkpointRecoveryPointOffsets() {
-    this.logDirs.foreach(checkpointLogsInDir)
+    this.logDirs.foreach(checkpointLogRecoveryOffsetsInDir)
+  }
+
+  /**
+   * Write out the current log start offset for all logs to a text file in the log directory
+   * to avoid exposing data that have been deleted by DeleteRecordsRequest
+   */
+  def checkpointLogStartOffsets() {
+    this.logDirs.foreach(checkpointLogStartOffsetsInDir)
   }
 
   /**
    * Make a checkpoint for all logs in provided directory.
    */
-  private def checkpointLogsInDir(dir: File): Unit = {
+  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
     val recoveryPoints = this.logsByDir.get(dir.toString)
     if (recoveryPoints.isDefined) {
       this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
@@ -347,6 +375,17 @@ class LogManager(val logDirs: Array[File],
   }
 
   /**
+   * Checkpoint log start offset for all logs in provided directory.
+   */
+  private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
+    val logs = this.logsByDir.get(dir.toString)
+    if (logs.isDefined) {
+      this.logStartOffsetCheckpoints(dir).write(
+        logs.get.filter{case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset}.mapValues(_.logStartOffset))
+    }
+  }
+
+  /**
    * Get the log if it exists, otherwise return None
    */
   def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
@@ -362,7 +401,7 @@ class LogManager(val logDirs: Array[File],
         val dataDir = nextLogDir()
         val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
         dir.mkdirs()
-        val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
+        val log = new Log(dir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler, time)
         logs.put(topicPartition, log)
         info("Created log for partition [%s,%d] in %s with properties {%s}."
           .format(topicPartition.topic,
@@ -425,6 +464,7 @@ class LogManager(val logDirs: Array[File],
       val renamedDir = new File(removedLog.dir.getParent, dirName)
       val renameSuccessful = removedLog.dir.renameTo(renamedDir)
       if (renameSuccessful) {
+        checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
         removedLog.dir = renamedDir
         // change the file pointers for log and index file
         for (logSegment <- removedLog.logSegments) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 9263515..4e77625 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -387,32 +387,33 @@ class LogSegment(val log: FileRecords,
   }
 
   /**
-   * Search the message offset based on timestamp.
-   * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
-   * greater than or equals to the target timestamp.
+   * Search the message offset based on timestamp and offset.
    *
-   * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
-   * timestamp will be max timestamp in the segment.
+   * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
    *
-   * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
-   * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
+   * - If all the messages in the segment have smaller offsets, return None
+   * - If all the messages in the segment have smaller timestamps, return None
+   * - If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp
+   *   the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp.
+   * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp
+   *   is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset.
    *
-   * This methods only returns None when the log is not empty but we did not see any messages when scanning the log
-   * from the indexed position. This could happen if the log is truncated after we get the indexed position but
-   * before we scan the log from there. In this case we simply return None and the caller will need to check on
-   * the truncated log and maybe retry or even do the search on another log segment.
+   * This methods only returns None when 1) all messages' offset < startOffing or 2) the log is not empty but we did not
+   * see any message when scanning the log from the indexed position. The latter could happen if the log is truncated
+   * after we get the indexed position but before we scan the log from there. In this case we simply return None and the
+   * caller will need to check on the truncated log and maybe retry or even do the search on another log segment.
    *
    * @param timestamp The timestamp to search for.
-   * @return the timestamp and offset of the first message whose timestamp is larger than or equal to the
-   *         target timestamp. None will be returned if there is no such message.
+   * @param startingOffset The starting offset to search.
+   * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message.
    */
-  def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
+  def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
     // Get the index entry with a timestamp less than or equal to the target timestamp
     val timestampOffset = timeIndex.lookup(timestamp)
-    val position = index.lookup(timestampOffset.offset).position
+    val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position
 
     // Search the timestamp
-    Option(log.searchForTimestamp(timestamp, position)).map { timestampAndOffset =>
+    Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
       TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8842724..14e56bd 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -63,13 +63,13 @@ abstract class AbstractFetcherThread(name: String,
   /* callbacks to be defined in subclass */
 
   // process fetched data
-  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD)
+  protected def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD)
 
   // handle a partition whose offset is out of range and return a new fetch offset
-  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
+  protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
 
   // deal with partitions with errors, potentially due to leadership changes
-  def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
+  protected def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
 
   protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): REQ
 
@@ -140,17 +140,17 @@ abstract class AbstractFetcherThread(name: String,
           val partitionId = topicPartition.partition
           Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
             // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
-            if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) {
+            if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset) {
               partitionData.error match {
                 case Errors.NONE =>
                   try {
                     val records = partitionData.toRecords
                     val newOffset = records.batches.asScala.lastOption.map(_.nextOffset).getOrElse(
-                      currentPartitionFetchState.offset)
+                      currentPartitionFetchState.fetchOffset)
 
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
+                    processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset, partitionData)
 
                     val validBytes = records.validBytes
                     if (validBytes > 0) {
@@ -164,18 +164,18 @@ abstract class AbstractFetcherThread(name: String,
                       // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       // should get fixed in the subsequent fetches
-                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error " + ime.getMessage)
+                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.fetchOffset  + " error " + ime.getMessage)
                       updatePartitionsWithError(topicPartition);
                     case e: Throwable =>
                       throw new KafkaException("error processing data for partition [%s,%d] offset %d"
-                        .format(topic, partitionId, currentPartitionFetchState.offset), e)
+                        .format(topic, partitionId, currentPartitionFetchState.fetchOffset), e)
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   try {
                     val newOffset = handleOffsetOutOfRange(topicPartition)
                     partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
                     error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
-                      .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
+                      .format(currentPartitionFetchState.fetchOffset, topic, partitionId, newOffset))
                   } catch {
                     case e: FatalExitError => throw e
                     case e: Throwable =>
@@ -226,7 +226,7 @@ abstract class AbstractFetcherThread(name: String,
       for (partition <- partitions) {
         Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState =>
           if (currentPartitionFetchState.isActive)
-            partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
+            partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay)))
         )
       }
       partitionMapCond.signalAll()
@@ -350,11 +350,11 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId:
 /**
   * case class to keep partition offset and its state(active, inactive)
   */
-case class PartitionFetchState(offset: Long, delay: DelayedItem) {
+case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem) {
 
-  def this(offset: Long) = this(offset, new DelayedItem(0))
+  def this(fetchOffset: Long) = this(fetchOffset, new DelayedItem(0))
 
   def isActive: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0
 
-  override def toString = "%d-%b".format(offset, isActive)
+  override def toString = "%d-%b".format(fetchOffset, isActive)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
new file mode 100644
index 0000000..e5b301c
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+
+import java.util.concurrent.TimeUnit
+
+import com.yammer.metrics.core.Meter
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Pool
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.DeleteRecordsResponse
+
+import scala.collection._
+
+
+case class DeleteRecordsPartitionStatus(requiredOffset: Long,
+                                        responseStatus: DeleteRecordsResponse.PartitionResponse) {
+  @volatile var acksPending = false
+
+  override def toString = "[acksPending: %b, error: %s, lowWatermark: %d, requiredOffset: %d]"
+    .format(acksPending, responseStatus.error.toString, responseStatus.lowWatermark, requiredOffset)
+}
+
+/**
+ * A delayed delete records operation that can be created by the replica manager and watched
+ * in the delete records operation purgatory
+ */
+class DelayedDeleteRecords(delayMs: Long,
+                           deleteRecordsStatus:  Map[TopicPartition, DeleteRecordsPartitionStatus],
+                           replicaManager: ReplicaManager,
+                           responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit)
+  extends DelayedOperation(delayMs) {
+
+  // first update the acks pending variable according to the error code
+  deleteRecordsStatus.foreach { case (topicPartition, status) =>
+    if (status.responseStatus.error == Errors.NONE) {
+      // Timeout error state will be cleared when required acks are received
+      status.acksPending = true
+      status.responseStatus.error = Errors.REQUEST_TIMED_OUT
+    } else {
+      status.acksPending = false
+    }
+
+    trace("Initial partition status for %s is %s".format(topicPartition, status))
+  }
+
+  /**
+   * The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
+   *
+   * 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response
+   * 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
+   *
+   */
+  override def tryComplete(): Boolean = {
+    // check for each partition if it still has pending acks
+    deleteRecordsStatus.foreach { case (topicPartition, status) =>
+      trace(s"Checking delete records satisfaction for ${topicPartition}, current status $status")
+      // skip those partitions that have already been satisfied
+      if (status.acksPending) {
+        val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
+          case Some(partition) =>
+            partition.leaderReplicaIfLocal match {
+              case Some(_) =>
+                val leaderLW = partition.lowWatermarkIfLeader
+                (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
+              case None =>
+                (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+            }
+          case None =>
+            (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+        }
+        if (error != Errors.NONE || lowWatermarkReached) {
+          status.acksPending = false
+          status.responseStatus.error = error
+          status.responseStatus.lowWatermark = lw
+        }
+      }
+    }
+
+    // check if every partition has satisfied at least one of case A or B
+    if (!deleteRecordsStatus.values.exists(_.acksPending))
+      forceComplete()
+    else
+      false
+  }
+
+  override def onExpiration() {
+    deleteRecordsStatus.foreach { case (topicPartition, status) =>
+      if (status.acksPending) {
+        DelayedDeleteRecordsMetrics.recordExpiration(topicPartition)
+      }
+    }
+  }
+
+  /**
+   * Upon completion, return the current response status along with the error code per partition
+   */
+  override def onComplete() {
+    val responseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+    responseCallback(responseStatus)
+  }
+}
+
+object DelayedDeleteRecordsMetrics extends KafkaMetricsGroup {
+
+  private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
+
+  def recordExpiration(partition: TopicPartition) {
+    aggregateExpirationMeter.mark()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index a05131a..cbee78a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -150,7 +150,7 @@ class DelayedFetch(delayMs: Long,
     )
 
     val fetchPartitionData = logReadResults.map { case (tp, result) =>
-      tp -> FetchPartitionData(result.error, result.hw, result.info.records)
+      tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
     }
 
     responseCallback(fetchPartitionData)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0798efd..defbf34 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -99,6 +99,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
         case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
+        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -147,7 +148,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       val leaderAndIsrResponse =
         if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
+          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
           new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
         } else {
           val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
@@ -199,7 +200,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val updateMetadataResponse =
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-        val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
+        val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
         if (deletedPartitions.nonEmpty)
           coordinator.handleDeletedPartitions(deletedPartitions)
 
@@ -451,12 +452,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
       case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
 
     val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
       case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
 
     // the callback for sending a fetch response
@@ -474,17 +475,17 @@ class KafkaApis(val requestChannel: RequestChannel,
           val convertedData = replicaManager.getMagic(tp) match {
             case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) =>
               trace(s"Down converting message to V0 for fetch request from $clientId")
-              FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
+              FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
 
             case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) =>
               trace(s"Down converting message to V1 for fetch request from $clientId")
-              FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
+              FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
 
             case _ => data
           }
 
-          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LSO,
-            null, convertedData.records)
+          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            convertedData.logStartOffset, null, convertedData.records)
         }
       }
 
@@ -728,7 +729,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new Array[(Long, Long)](segments.length)
 
     for (i <- segments.indices)
-      offsetTimeArray(i) = (segments(i).baseOffset, segments(i).lastModified)
+      offsetTimeArray(i) = (math.max(segments(i).baseOffset, log.logStartOffset), segments(i).lastModified)
     if (lastSegmentHasSize)
       offsetTimeArray(segments.length) = (log.logEndOffset, time.milliseconds)
 
@@ -1259,6 +1260,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleDeleteRecordsRequest(request: RequestChannel.Request) {
+    val deleteRecordsRequest = request.body[DeleteRecordsRequest]
+
+    val (authorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteRecordsRequest.partitionOffsets.asScala.partition {
+      case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
+    }
+
+    val (authorizedForDeleteTopics, unauthorizedForDeleteTopics) = authorizedForDescribeTopics.partition {
+      case (topicPartition, _) => authorize(request.session, Delete, new Resource(auth.Topic, topicPartition.topic))
+    }
+
+    // the callback for sending a DeleteRecordsResponse
+    def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
+
+      val mergedResponseStatus = responseStatus ++
+        unauthorizedForDeleteTopics.mapValues(_ =>
+          new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
+        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
+          new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
+
+      mergedResponseStatus.foreach { case (topicPartition, status) =>
+        if (status.error != Errors.NONE) {
+          debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format(
+            request.header.correlationId,
+            request.header.clientId,
+            topicPartition,
+            status.error.exceptionName))
+        }
+      }
+
+      val respBody = new DeleteRecordsResponse(mergedResponseStatus.asJava)
+      requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
+
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeMs = time.milliseconds
+    }
+
+    if (authorizedForDeleteTopics.isEmpty)
+      sendResponseCallback(Map.empty)
+    else {
+      // call the replica manager to append messages to the replicas
+      replicaManager.deleteRecords(
+        deleteRecordsRequest.timeout.toLong,
+        authorizedForDeleteTopics.mapValues(_.toLong),
+        sendResponseCallback)
+    }
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 879bc51..fe6631e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -100,6 +100,7 @@ object Defaults {
   val LogDeleteDelayMs = 60000
   val LogFlushSchedulerIntervalMs = Long.MaxValue
   val LogFlushOffsetCheckpointIntervalMs = 60000
+  val LogFlushStartOffsetCheckpointIntervalMs = 60000
   val LogPreAllocateEnable = false
   // lazy val as `InterBrokerProtocolVersion` is defined later
   lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
@@ -125,6 +126,7 @@ object Defaults {
   val ReplicaHighWatermarkCheckpointIntervalMs = 5000L
   val FetchPurgatoryPurgeIntervalRequests = 1000
   val ProducerPurgatoryPurgeIntervalRequests = 1000
+  val DeleteRecordsPurgatoryPurgeIntervalRequests = 1
   val AutoLeaderRebalanceEnable = true
   val LeaderImbalancePerBrokerPercentage = 10
   val LeaderImbalanceCheckIntervalSeconds = 300
@@ -273,6 +275,7 @@ object KafkaConfig {
   val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
   val LogFlushIntervalMsProp = "log.flush.interval.ms"
   val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
+  val LogFlushStartOffsetCheckpointIntervalMsProp = "log.flush.start.offset.checkpoint.interval.ms"
   val LogPreAllocateProp = "log.preallocate"
   val LogMessageFormatVersionProp = LogConfigPrefix + "message.format.version"
   val LogMessageTimestampTypeProp = LogConfigPrefix + "message.timestamp.type"
@@ -296,6 +299,7 @@ object KafkaConfig {
   val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms"
   val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests"
   val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests"
+  val DeleteRecordsPurgatoryPurgeIntervalRequestsProp = "delete.records.purgatory.purge.interval.requests"
   val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable"
   val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage"
   val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds"
@@ -468,6 +472,7 @@ object KafkaConfig {
   val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk"
   val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in " + LogFlushSchedulerIntervalMsProp + " is used"
   val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point"
+  val LogFlushStartOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of log start offset"
   val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."
   val LogMessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid ApiVersion. " +
     "Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format version, the " +
@@ -521,6 +526,7 @@ object KafkaConfig {
   val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark is saved out to disk"
   val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory"
   val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the producer request purgatory"
+  val DeleteRecordsPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the delete records request purgatory"
   val AutoLeaderRebalanceEnableDoc = "Enables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervals"
   val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage."
   val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller"
@@ -686,6 +692,7 @@ object KafkaConfig {
       .define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc)
       .define(LogFlushIntervalMsProp, LONG, null, HIGH, LogFlushIntervalMsDoc)
       .define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc)
+      .define(LogFlushStartOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushStartOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushStartOffsetCheckpointIntervalMsDoc)
       .define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc)
       .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
       .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
@@ -710,6 +717,7 @@ object KafkaConfig {
       .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc)
       .define(FetchPurgatoryPurgeIntervalRequestsProp, INT, Defaults.FetchPurgatoryPurgeIntervalRequests, MEDIUM, FetchPurgatoryPurgeIntervalRequestsDoc)
       .define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc)
+      .define(DeleteRecordsPurgatoryPurgeIntervalRequestsProp, INT, Defaults.DeleteRecordsPurgatoryPurgeIntervalRequests, MEDIUM, DeleteRecordsPurgatoryPurgeIntervalRequestsDoc)
       .define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc)
       .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc)
       .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
@@ -862,6 +870,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
   val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
   val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
+  val logFlushStartOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp).toLong
   val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
   val logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
   val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
@@ -907,6 +916,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
   val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
   val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
+  val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
   val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
   val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 465b0b7..0d3e49c 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -297,7 +297,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   }
 
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
-    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower)
+    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, metadataCache)
 
   private def initZk(): ZkUtils = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
@@ -655,7 +655,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                    cleanerConfig = cleanerConfig,
                    ioThreads = config.numRecoveryThreadsPerDataDir,
                    flushCheckMs = config.logFlushSchedulerIntervalMs,
-                   flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+                   flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+                   flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
                    retentionCheckMs = config.logCleanupIntervalMs,
                    scheduler = kafkaScheduler,
                    brokerState = brokerState,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9a6090d..bf36974 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -132,6 +132,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def isBrokerAlive(brokerId: Int): Boolean = {
+    inReadLock(partitionMetadataLock) {
+      aliveBrokers.contains(brokerId)
+    }
+  }
+
   def getAliveBrokers: Seq[Broker] = {
     inReadLock(partitionMetadataLock) {
       aliveBrokers.values.toBuffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 29a2467..5f055a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -60,7 +60,8 @@ class ReplicaFetcherThread(name: String,
   type PD = PartitionData
 
   private val fetchRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
@@ -136,10 +137,12 @@ class ReplicaFetcherThread(name: String,
         trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
           .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
       val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
+      val leaderLogStartOffset = partitionData.logStartOffset
       // for the follower replica, we do not need to keep
       // its segment base offset the physical position,
       // these values will be computed upon making the leader
       replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
+      replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
       if (logger.isTraceEnabled)
         trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
       if (quota.isThrottled(topicPartition))
@@ -289,8 +292,10 @@ class ReplicaFetcherThread(name: String,
 
     partitionMap.foreach { case (topicPartition, partitionFetchState) =>
       // We will not include a replica in the fetch request if it should be throttled.
-      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
-        requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
+      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition)) {
+        val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset
+        requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
+      }
     }
 
     val requestBuilder = JFetchRequest.Builder.forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, requestMap)
@@ -313,7 +318,7 @@ object ReplicaFetcherThread {
   private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest {
     def isEmpty: Boolean = underlying.fetchData().isEmpty
     def offset(topicPartition: TopicPartition): Long =
-      underlying.fetchData().asScala(topicPartition).offset
+      underlying.fetchData().asScala(topicPartition).fetchOffset
   }
 
   private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
@@ -326,6 +331,8 @@ object ReplicaFetcherThread {
 
     def highWatermark: Long = underlying.highWatermark
 
+    def logStartOffset: Long = underlying.logStartOffset
+
     def exception: Option[Throwable] = error match {
       case Errors.NONE => None
       case e => Some(e.exception)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5ba093e..8f67425 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,12 +29,13 @@ import kafka.log.{Log, LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils._
-import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, PolicyViolationException}
+import org.apache.kafka.common.errors.{NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, DeleteRecordsRequest, DeleteRecordsResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -52,6 +53,13 @@ case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = N
   }
 }
 
+case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exception: Option[Throwable] = None) {
+  def error: Errors = exception match {
+    case None => Errors.NONE
+    case Some(e) => Errors.forException(e)
+  }
+}
+
 /*
  * Result metadata of a log read operation on the log
  * @param info @FetchDataInfo returned by the @Log read
@@ -63,7 +71,9 @@ case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = N
  */
 case class LogReadResult(info: FetchDataInfo,
                          hw: Long,
+                         leaderLogStartOffset: Long,
                          leaderLogEndOffset: Long,
+                         followerLogStartOffset: Long,
                          fetchTimeMs: Long,
                          readSize: Int,
                          exception: Option[Throwable] = None) {
@@ -74,16 +84,19 @@ case class LogReadResult(info: FetchDataInfo,
   }
 
   override def toString =
-    s"Fetch Data: [$info], HW: [$hw], leaderLogEndOffset: [$leaderLogEndOffset], readSize: [$readSize], error: [$error]"
+    s"Fetch Data: [$info], HW: [$hw], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
+    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]"
 
 }
 
-case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, records: Records)
+case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records)
 
 object LogReadResult {
   val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                                            hw = -1L,
+                                           leaderLogStartOffset = -1L,
                                            leaderLogEndOffset = -1L,
+                                           followerLogStartOffset = -1L,
                                            fetchTimeMs = -1L,
                                            readSize = -1)
 }
@@ -109,6 +122,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean,
                      quotaManager: ReplicationQuotaManager,
+                     val metadataCache: MetadataCache,
                      threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
@@ -130,6 +144,8 @@ class ReplicaManager(val config: KafkaConfig,
     purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests)
   val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
     purgatoryName = "Fetch", localBrokerId, config.fetchPurgatoryPurgeIntervalRequests)
+  val delayedDeleteRecordsPurgatory = DelayedOperationPurgatory[DelayedDeleteRecords](
+    purgatoryName = "DeleteRecords", localBrokerId, config.deleteRecordsPurgatoryPurgeIntervalRequests)
 
   val leaderCount = newGauge(
     "LeaderCount",
@@ -212,6 +228,15 @@ class ReplicaManager(val config: KafkaConfig,
     debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
   }
 
+  /**
+   * Try to complete some delayed DeleteRecordsRequest with the request key;
+   * this needs to be triggered when the partition low watermark has changed
+   */
+  def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) {
+    val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key)
+    debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed))
+  }
+
   def startup() {
     // start ISR expiration thread
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
@@ -316,7 +341,7 @@ class ReplicaManager(val config: KafkaConfig,
                   new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
       }
 
-      if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
+      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
         val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
@@ -345,14 +370,108 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
+   * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
+   */
+  private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
+    trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
+    offsetPerPartition.map { case (topicPartition, requestedOffset) =>
+      // reject delete records operation on internal topics
+      if (Topic.isInternal(topicPartition.topic)) {
+        (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
+      } else {
+        try {
+          val partition = getPartition(topicPartition).getOrElse(
+            throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId)))
+          val convertedOffset =
+            if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) {
+              partition.leaderReplicaIfLocal match {
+                case Some(leaderReplica) =>
+                  leaderReplica.highWatermark.messageOffset
+                case None =>
+                  throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+                    .format(topicPartition, localBrokerId))
+              }
+            } else
+              requestedOffset
+          if (convertedOffset < 0)
+            throw new OffsetOutOfRangeException(s"The offset $convertedOffset for partition $topicPartition is not valid")
+
+          val lowWatermark = partition.deleteRecordsOnLeader(convertedOffset)
+          (topicPartition, LogDeleteRecordsResult(convertedOffset, lowWatermark))
+        } catch {
+          // NOTE: Failed produce requests metric is not incremented for known exceptions
+          // it is supposed to indicate un-expected failures of a broker in handling a produce request
+          case e: KafkaStorageException =>
+            fatal("Halting due to unrecoverable I/O error while handling DeleteRecordsRequest: ", e)
+            Runtime.getRuntime.halt(1)
+            (topicPartition, null)
+          case e@ (_: UnknownTopicOrPartitionException |
+                   _: NotLeaderForPartitionException |
+                   _: OffsetOutOfRangeException |
+                   _: PolicyViolationException |
+                   _: NotEnoughReplicasException) =>
+            (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e)))
+          case t: Throwable =>
+            error("Error processing delete records operation on partition %s".format(topicPartition), t)
+            (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(t)))
+        }
+      }
+    }
+  }
+
+  // If there exists a topic partition that meets the following requirement,
+  // we need to put a delayed DeleteRecordsRequest and wait for the delete records operation to complete
+  //
+  // 1. the delete records operation on this partition is successful
+  // 2. low watermark of this partition is smaller than the specified offset
+  private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = {
+    localDeleteRecordsResults.exists{ case (tp, deleteRecordsResult) =>
+      deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset
+    }
+  }
+
+  def deleteRecords(timeout: Long,
+                    offsetPerPartition: Map[TopicPartition, Long],
+                    responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit) {
+    val timeBeforeLocalDeleteRecords = time.milliseconds
+    val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
+    debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))
+
+    val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
+      topicPartition ->
+        DeleteRecordsPartitionStatus(
+          result.requestedOffset, // requested offset
+          new DeleteRecordsResponse.PartitionResponse(result.lowWatermark, result.error)) // response status
+    }
+
+    if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
+      // create delayed delete records operation
+      val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
+
+      // create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
+      val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+
+      // try to complete the request immediately, otherwise put it into the purgatory
+      // this is because while the delayed delete records operation is being created, new
+      // requests may arrive and hence make this operation completable.
+      delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
+    } else {
+      // we can respond immediately
+      val deleteRecordsResponseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+      responseCallback(deleteRecordsResponseStatus)
+    }
+  }
+
   // If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
   //
   // 1. required acks = -1
   // 2. there is data to append
   // 3. at least one partition append was successful (fewer errors than partitions)
-  private def delayedRequestRequired(requiredAcks: Short,
-                                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
-                                     localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
+  private def delayedProduceRequestRequired(requiredAcks: Short,
+                                            entriesPerPartition: Map[TopicPartition, MemoryRecords],
+                                            localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
     requiredAcks == -1 &&
     entriesPerPartition.nonEmpty &&
     localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
@@ -471,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig,
     //                        4) some error happens while reading data
     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
-        tp -> FetchPartitionData(result.error, result.hw, result.info.records)
+        tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
       }
       responseCallback(fetchPartitionData)
     } else {
@@ -508,8 +627,9 @@ class ReplicaManager(val config: KafkaConfig,
                        quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
 
     def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
-      val offset = fetchInfo.offset
+      val offset = fetchInfo.fetchOffset
       val partitionFetchSize = fetchInfo.maxBytes
+      val followerLogStartOffset = fetchInfo.logStartOffset
 
       BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
       BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
@@ -539,6 +659,7 @@ class ReplicaManager(val config: KafkaConfig,
          */
         val initialLogEndOffset = localReplica.logEndOffset.messageOffset
         val initialHighWatermark = localReplica.highWatermark.messageOffset
+        val initialLogStartOffset = localReplica.logStartOffset
         val fetchTimeMs = time.milliseconds
         val logReadInfo = localReplica.log match {
           case Some(log) =>
@@ -563,7 +684,9 @@ class ReplicaManager(val config: KafkaConfig,
 
         LogReadResult(info = logReadInfo,
                       hw = initialHighWatermark,
+                      leaderLogStartOffset = initialLogStartOffset,
                       leaderLogEndOffset = initialLogEndOffset,
+                      followerLogStartOffset = followerLogStartOffset,
                       fetchTimeMs = fetchTimeMs,
                       readSize = partitionFetchSize,
                       exception = None)
@@ -576,7 +699,9 @@ class ReplicaManager(val config: KafkaConfig,
                  _: OffsetOutOfRangeException) =>
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                         hw = -1L,
+                        leaderLogStartOffset = -1L,
                         leaderLogEndOffset = -1L,
+                        followerLogStartOffset = -1L,
                         fetchTimeMs = -1L,
                         readSize = partitionFetchSize,
                         exception = Some(e))
@@ -586,7 +711,9 @@ class ReplicaManager(val config: KafkaConfig,
           error(s"Error processing fetch operation on partition $tp, offset $offset", e)
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                         hw = -1L,
+                        leaderLogStartOffset = -1L,
                         leaderLogEndOffset = -1L,
+                        followerLogStartOffset = -1L,
                         fetchTimeMs = -1L,
                         readSize = partitionFetchSize,
                         exception = Some(e))
@@ -622,7 +749,7 @@ class ReplicaManager(val config: KafkaConfig,
   def getMagic(topicPartition: TopicPartition): Option[Byte] =
     getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
 
-  def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] =  {
+  def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
       if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
         val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
@@ -640,7 +767,6 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
-                             metadataCache: MetadataCache,
                              onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
@@ -695,7 +821,7 @@ class ReplicaManager(val config: KafkaConfig,
         else
           Set.empty[Partition]
         val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
+          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
         else
           Set.empty[Partition]
 
@@ -801,8 +927,7 @@ class ReplicaManager(val config: KafkaConfig,
                             epoch: Int,
                             partitionState: Map[Partition, PartitionState],
                             correlationId: Int,
-                            responseMap: mutable.Map[TopicPartition, Errors],
-                            metadataCache: MetadataCache) : Set[Partition] = {
+                            responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
     partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
         "starting the become-follower transition for partition %s")