You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/03/28 16:59:49 UTC

[1/3] kafka git commit: KAFKA-4586; Add purgeDataBefore() API (KIP-107)

Repository: kafka
Updated Branches:
  refs/heads/trunk f3f9a9eaf -> 8b05ad406


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 1e2749f..c9ea05f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -17,16 +17,21 @@
 package kafka.api
 
 import java.util.Collections
+import java.util.concurrent.TimeUnit
 
 import kafka.admin.AdminClient
+import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.server.KafkaConfig
+import java.lang.{Long => JLong}
 import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.{Before, Test}
+import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.junit.{After, Before, Test}
 import org.junit.Assert._
+import scala.collection.JavaConverters._
 
 class AdminClientTest extends IntegrationTestHarness with Logging {
 
@@ -58,18 +63,132 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Before
   override def setUp() {
-    super.setUp
+    super.setUp()
     client = AdminClient.createSimplePlaintext(this.brokerList)
     TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
   }
 
+  @After
+  override def tearDown() {
+    client.close()
+    super.tearDown()
+  }
+
   @Test
-  def testListGroups() {
-    consumers.head.subscribe(Collections.singletonList(topic))
+  def testSeekToBeginningAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(0L, consumer.position(tp))
+
+    client.deleteRecordsBefore(Map((tp, 5L))).get()
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(5L, consumer.position(tp))
+
+    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(10L, consumer.position(tp))
+  }
+
+  @Test
+  def testConsumeAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    var messageCount = 0
     TestUtils.waitUntilTrue(() => {
-      consumers.head.poll(0)
-      !consumers.head.assignment.isEmpty
-    }, "Expected non-empty assignment")
+      messageCount += consumer.poll(0).count()
+      messageCount == 10
+    }, "Expected 10 messages", 3000L)
+
+    client.deleteRecordsBefore(Map((tp, 3L))).get()
+    consumer.seek(tp, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 7
+    }, "Expected 7 messages", 3000L)
+
+    client.deleteRecordsBefore(Map((tp, 8L))).get()
+    consumer.seek(tp, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 2
+    }, "Expected 2 messages", 3000L)
+  }
+
+  @Test
+  def testLogStartOffsetCheckpoint() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+
+    for (i <- 0 until serverCount)
+      killBroker(i)
+    restartDeadBrokers()
+
+    client.close()
+    brokerList = TestUtils.bootstrapServers(servers, listenerName)
+    client = AdminClient.createSimplePlaintext(brokerList)
+
+    TestUtils.waitUntilTrue(() => {
+      // Need to retry if leader is not available for the partition
+      client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
+    }, "Expected low watermark of the partition to be 5L")
+  }
+
+  @Test
+  def testLogStartOffsetAfterDeleteRecords() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    client.deleteRecordsBefore(Map((tp, 3L))).get()
+
+    for (i <- 0 until serverCount)
+      assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
+  }
+
+  @Test
+  def testOffsetsForTimesAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+    client.deleteRecordsBefore(Map((tp, 5L))).get()
+    assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+    assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
+  }
+
+  @Test
+  def testDeleteRecordsWithException() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    // Should get success result
+    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+    // OffsetOutOfRangeException if offset > high_watermark
+    assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
+    // TimeoutException if response is not available within user-specified timeout
+    assertEquals(DeleteRecordsResult(-1L, Errors.REQUEST_TIMED_OUT.exception()), client.deleteRecordsBefore(Map((tp, 5L))).get(0, TimeUnit.MILLISECONDS)(tp))
+
+    val nonExistPartition = new TopicPartition(topic, 3)
+    // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
+    assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
+                 client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
+  }
+
+  @Test
+  def testListGroups() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
 
     val groups = client.listAllGroupsFlattened
     assertFalse(groups.isEmpty)
@@ -80,11 +199,8 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testListAllBrokerVersionInfo() {
-    consumers.head.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumers.head.poll(0)
-      !consumers.head.assignment.isEmpty
-    }, "Expected non-empty assignment")
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
     val brokerVersionInfos = client.listAllBrokerVersionInfo
     val brokers = brokerList.split(",")
     assertEquals(brokers.size, brokerVersionInfos.size)
@@ -98,11 +214,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testGetConsumerGroupSummary() {
-    consumers.head.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumers.head.poll(0)
-      !consumers.head.assignment.isEmpty
-    }, "Expected non-empty assignment")
+    subscribeAndWaitForAssignment(topic, consumers.head)
 
     val group = client.describeConsumerGroup(groupId)
     assertEquals("range", group.assignmentStrategy)
@@ -117,11 +229,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testDescribeConsumerGroup() {
-    consumers.head.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumers.head.poll(0)
-      !consumers.head.assignment.isEmpty
-    }, "Expected non-empty assignment")
+    subscribeAndWaitForAssignment(topic, consumers.head)
 
     val consumerGroupSummary = client.describeConsumerGroup(groupId)
     assertEquals(1, consumerGroupSummary.consumers.get.size)
@@ -133,4 +241,25 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
     val nonExistentGroup = "non" + groupId
     assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
   }
+
+  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+    consumer.subscribe(Collections.singletonList(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0)
+      !consumer.assignment.isEmpty
+    }, "Expected non-empty assignment")
+  }
+
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                          numRecords: Int,
+                          tp: TopicPartition) {
+    val futures = (0 until numRecords).map { i =>
+      val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
+      debug(s"Sending this record: $record")
+      producer.send(record)
+    }
+
+    futures.foreach(_.get)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index cb4c235..e4cece9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -190,7 +190,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createFetchRequest = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
-    partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 100))
+    partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
     val version = ApiKeys.FETCH.latestVersion
     requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, partitionMap).build()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 4ec77a1..2198bf2 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -251,6 +251,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     restartDeadBrokers()
     checkClosedState(dynamicGroup, 0)
     checkClosedState(manualGroup, numRecords)
+    adminClient.close()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 7870485..ae76eb6 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -92,7 +92,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
         override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
           fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
             (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE,
-              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, null)))
+              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null)))
           }
         }
       }
@@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
 
       override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
         new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
-          quotaManagers.follower) {
+          quotaManagers.follower, metadataCache) {
 
           override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
                                                              quotaManager: ReplicationQuotaManager) =

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index aff3b2f..0c1297f 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -43,6 +43,7 @@ object StressTestLog {
 
     val log = new Log(dir = dir,
                       config = LogConfig(logProperties),
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 9c29679..34c6775 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -206,7 +206,7 @@ object TestLinearWriteSpeed {
   
   class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable {
     Utils.delete(dir)
-    val log = new Log(dir, config, 0L, scheduler, Time.SYSTEM)
+    val log = new Log(dir, config, 0L, 0L, scheduler, Time.SYSTEM)
     def write(): Int = {
       log.append(messages, true)
       messages.sizeInBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 0b1978c..5f97708 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -53,7 +53,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
     val logProps = new Properties()
     logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
     /*configure broker-side compression  */
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     /* append two messages */
     log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index f2dbc6e..3e91f96 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -324,6 +324,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
       val log = new Log(dir = dir,
                         LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)),
+                        logStartOffset = 0L,
                         recoveryPoint = 0L,
                         scheduler = time.scheduler,
                         time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 4d8c836..05d9060 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -151,6 +151,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
 
       val log = new Log(dir = dir,
         LogConfig(logProps),
+        logStartOffset = 0L,
         recoveryPoint = 0L,
         scheduler = time.scheduler,
         time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 3690c55..94207ec 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -183,6 +183,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val partitionDir = new File(logDir, "log-0")
     val log = new Log(partitionDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -190,7 +191,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   }
 
   private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
   private def records(key: Int, value: Int, timestamp: Long) =
     MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 18c1bbe..38eb94c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -775,7 +775,7 @@ class LogCleanerTest extends JUnitSuite {
     messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
 
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
   def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */  }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c87e927..768c073 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -75,6 +75,7 @@ class LogTest extends JUnitSuite {
     // create a log
     val log = new Log(logDir,
                       LogConfig(logProps),
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)
@@ -134,6 +135,7 @@ class LogTest extends JUnitSuite {
     // create a log
     val log = new Log(logDir,
       LogConfig(logProps),
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       time = time)
@@ -165,7 +167,7 @@ class LogTest extends JUnitSuite {
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -181,7 +183,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds))
   }
 
@@ -194,7 +196,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
@@ -218,7 +220,7 @@ class LogTest extends JUnitSuite {
   def testAppendAndReadWithNonSequentialOffsets() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -243,7 +245,7 @@ class LogTest extends JUnitSuite {
   def testReadAtLogGap() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -260,7 +262,7 @@ class LogTest extends JUnitSuite {
   def testReadWithMinMessage() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -288,7 +290,7 @@ class LogTest extends JUnitSuite {
   def testReadWithTooSmallMaxLength() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -324,7 +326,7 @@ class LogTest extends JUnitSuite {
 
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "42".getBytes))
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
@@ -355,7 +357,7 @@ class LogTest extends JUnitSuite {
     /* create a multipart log with 100 messages */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
                                                                                 timestamp = time.milliseconds))
@@ -393,7 +395,7 @@ class LogTest extends JUnitSuite {
     /* this log should roll after every messageset */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
@@ -419,7 +421,7 @@ class LogTest extends JUnitSuite {
       val logProps = new Properties()
       logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
       logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
-      val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10))
 
@@ -455,7 +457,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     try {
       log.append(messageSet)
@@ -482,7 +484,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
 
     try {
       log.append(messageSetWithUnkeyedMessage)
@@ -524,7 +526,7 @@ class LogTest extends JUnitSuite {
     val maxMessageSize = second.sizeInBytes - 1
     val logProps = new Properties()
     logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -550,7 +552,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
         timestamp = time.milliseconds + i * 10))
@@ -576,12 +578,12 @@ class LogTest extends JUnitSuite {
       assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
     }
 
-    log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, time.scheduler, time)
     verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
-    log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     verifyRecoveredLog(log)
     log.close()
   }
@@ -597,7 +599,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
 
     val messages = (0 until numMessages).map { i =>
       MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
@@ -621,7 +623,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -633,7 +635,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // reopen the log
-    log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@@ -660,7 +662,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
@@ -670,7 +672,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // The rebuilt time index should be empty
-    log = new Log(logDir, config, recoveryPoint = numMessages + 1, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, time.scheduler, time)
     val segArray = log.logSegments.toArray
     for (i <- 0 until segArray.size - 1) {
       assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
@@ -691,7 +693,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -713,7 +715,7 @@ class LogTest extends JUnitSuite {
     }
 
     // reopen the log
-    log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -739,7 +741,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
 
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (_ <- 1 to msgPerSeg)
@@ -794,7 +796,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer)
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
@@ -838,6 +840,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     val log = new Log(logDir,
                       LogConfig(logProps),
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -869,6 +872,7 @@ class LogTest extends JUnitSuite {
     // create a log
     var log = new Log(logDir,
                       config,
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -879,6 +883,7 @@ class LogTest extends JUnitSuite {
     log.close()
     log = new Log(logDir,
                   config,
+                  logStartOffset = 0L,
                   recoveryPoint = 0L,
                   time.scheduler,
                   time)
@@ -904,6 +909,7 @@ class LogTest extends JUnitSuite {
 
     val log = new Log(logDir,
                       config,
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -944,6 +950,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir,
                       config,
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -958,6 +965,7 @@ class LogTest extends JUnitSuite {
 
     log = new Log(logDir,
                   config,
+                  logStartOffset = 0L,
                   recoveryPoint = 0L,
                   time.scheduler,
                   time)
@@ -968,6 +976,7 @@ class LogTest extends JUnitSuite {
   def testAppendMessageWithNullPayload() {
     val log = new Log(logDir,
                       LogConfig(),
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -981,6 +990,7 @@ class LogTest extends JUnitSuite {
   def testAppendWithOutOfOrderOffsetsThrowsException() {
     val log = new Log(logDir,
       LogConfig(),
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -994,6 +1004,7 @@ class LogTest extends JUnitSuite {
   def testAppendWithNoTimestamp(): Unit = {
     val log = new Log(logDir,
       LogConfig(),
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -1016,6 +1027,7 @@ class LogTest extends JUnitSuite {
       logDir.mkdirs()
       var log = new Log(logDir,
                         config,
+                        logStartOffset = 0L,
                         recoveryPoint = 0L,
                         time.scheduler,
                         time)
@@ -1030,7 +1042,7 @@ class LogTest extends JUnitSuite {
       TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
 
       // attempt recovery
-      log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
+      log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, time)
       assertEquals(numMessages, log.logEndOffset)
 
       val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList
@@ -1058,6 +1070,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -1106,6 +1119,7 @@ class LogTest extends JUnitSuite {
     // create a log and write some messages to it
     var log = new Log(logDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -1116,7 +1130,7 @@ class LogTest extends JUnitSuite {
     // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
     // clean shutdown file exists.
     recoveryPoint = log.logEndOffset
-    log = new Log(logDir, config, 0L, time.scheduler, time)
+    log = new Log(logDir, config, 0L, 0L, time.scheduler, time)
     assertEquals(recoveryPoint, log.logEndOffset)
     cleanShutdownFile.delete()
   }
@@ -1205,6 +1219,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -1226,6 +1241,31 @@ class LogTest extends JUnitSuite {
     assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
   }
 
+  @Test
+  def testLogDeletionAfterDeleteRecords() {
+    val set = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(set.sizeInBytes)
+
+    for (_ <- 0 until 15)
+      log.append(set)
+    assertEquals("should have 3 segments", 3, log.numberOfSegments)
+    assertEquals(log.logStartOffset, 0)
+
+    log.maybeIncrementLogStartOffset(1)
+    log.deleteOldSegments()
+    assertEquals("should have 3 segments", 3, log.numberOfSegments)
+    assertEquals(log.logStartOffset, 1)
+
+    log.maybeIncrementLogStartOffset(6)
+    log.deleteOldSegments()
+    assertEquals("should have 2 segments", 2, log.numberOfSegments)
+    assertEquals(log.logStartOffset, 6)
+
+    log.maybeIncrementLogStartOffset(15)
+    log.deleteOldSegments()
+    assertEquals("should have 1 segments", 1, log.numberOfSegments)
+    assertEquals(log.logStartOffset, 15)
+  }
 
   @Test
   def shouldDeleteSizeBasedSegments() {
@@ -1324,6 +1364,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 581a917..3babfc8 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -121,7 +121,7 @@ class AbstractFetcherThreadTest {
       fetchRequest.offsets.mapValues(_ => new TestPartitionData()).toSeq
 
     override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
-      new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) }.toMap)
+      new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.fetchOffset) }.toMap)
   }
 
 
@@ -199,7 +199,7 @@ class AbstractFetcherThreadTest {
       partitionMap.foreach { case (topicPartition, partitionFetchState) =>
         // Add backoff delay check
         if (partitionFetchState.isActive)
-          requestMap.put(topicPartition, partitionFetchState.offset)
+          requestMap.put(topicPartition, partitionFetchState.fetchOffset)
       }
       new DummyFetchRequest(requestMap)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index b350732..eefb35e 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -62,7 +62,7 @@ class FetchRequestTest extends BaseRequestTest {
                                  offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
     topicPartitions.foreach { tp =>
-      partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), maxPartitionBytes))
+      partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), 0L, maxPartitionBytes))
     }
     partitionMap
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 948b5ec..55cfa27 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -60,7 +60,8 @@ class HighwatermarkPersistenceTest {
     val time = new MockTime
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
-      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
+      new MetadataCache(configs.head.brokerId))
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -104,7 +105,8 @@ class HighwatermarkPersistenceTest {
     val time = new MockTime
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
-      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
+      new MetadataCache(configs.head.brokerId))
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 348bfc3..3898d2b 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -53,7 +53,7 @@ class IsrExpirationTest {
   @Before
   def setUp() {
     replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false),
-      QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
   }
 
   @After
@@ -78,7 +78,9 @@ class IsrExpirationTest {
     for (replica <- partition0.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
                                                     hw = 15L,
+                                                    leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 15L,
+                                                    followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
                                                     readSize = -1))
     var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
@@ -130,7 +132,9 @@ class IsrExpirationTest {
     for (replica <- partition0.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY),
                                                     hw = 10L,
+                                                    leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 15L,
+                                                    followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
                                                     readSize = -1))
 
@@ -144,7 +148,9 @@ class IsrExpirationTest {
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
       r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY),
                             hw = 11L,
+                            leaderLogStartOffset = 0L,
                             leaderLogEndOffset = 15L,
+                            followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1))
     }
@@ -161,7 +167,9 @@ class IsrExpirationTest {
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
       r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
                             hw = 15L,
+                            leaderLogStartOffset = 0L,
                             leaderLogEndOffset = 15L,
+                            followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1))
     }
@@ -185,7 +193,9 @@ class IsrExpirationTest {
     for (replica <- partition.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY),
                                                     hw = 0L,
+                                                    leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 0L,
+                                                    followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
                                                     readSize = -1))
     // set the leader and its hw and the hw update time

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index c89e626..c27f8c5 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -587,6 +587,7 @@ class KafkaConfigTest {
         case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
         case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
@@ -603,6 +604,7 @@ class KafkaConfigTest {
         case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
         case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9386a1d..f65884e 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import java.io.File
 import java.util.concurrent.atomic.AtomicLong
 import java.util.{Properties, Random}
+import java.lang.{Long => JLong}
 
 import kafka.admin.AdminUtils
 import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
@@ -31,10 +32,13 @@ import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import scala.collection.JavaConverters._
 
 class LogOffsetTest extends ZooKeeperTestHarness {
   val random = new Random()
@@ -76,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testGetOffsetsBeforeLatestTime() {
+  def testGetOffsetsAfterDeleteRecords() {
     val topicPartition = "kafka-" + 0
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
@@ -93,6 +97,40 @@ class LogOffsetTest extends ZooKeeperTestHarness {
       log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
     log.flush()
 
+    log.maybeIncrementLogStartOffset(3)
+    log.deleteOldSegments()
+
+    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
+    assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), offsets)
+
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
+    val topicAndPartition = TopicAndPartition(topic, part)
+    val offsetRequest = OffsetRequest(
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)),
+      replicaId = 0)
+    val consumerOffsets =
+      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
+  }
+
+  @Test
+  def testGetOffsetsBeforeLatestTime() {
+    val topicPartition = "kafka-" + 0
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    // setup brokers in zookeeper as owners of partitions for this test
+    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+
+    val logManager = server.getLogManager
+    waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(new TopicPartition(topic, part)).get
+
+    for (_ <- 0 until 20)
+      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
+    log.flush()
+
     val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f856e52..a720a6a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -40,7 +40,7 @@ class ReplicaManagerQuotasTest {
   val record = new SimpleRecord("some-data-in-a-message".getBytes())
   val topicPartition1 = new TopicPartition("test-topic", 1)
   val topicPartition2 = new TopicPartition("test-topic", 2)
-  val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 100), topicPartition2 -> new PartitionData(0, 100))
+  val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 0, 100), topicPartition2 -> new PartitionData(0, 0, 100))
   var replicaManager: ReplicaManager = null
 
   @Test
@@ -147,6 +147,7 @@ class ReplicaManagerQuotasTest {
 
     //Create log which handles both a regular read and a 0 bytes read
     val log = createMock(classOf[Log])
+    expect(log.logStartOffset).andReturn(0L).anyTimes()
     expect(log.logEndOffset).andReturn(20L).anyTimes()
     expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
 
@@ -173,7 +174,7 @@ class ReplicaManagerQuotasTest {
     replay(logManager)
 
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
 
     //create the two replicas
     for ((p, _) <- fetchInfo) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d48e4f2..e00c142 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -65,7 +65,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
@@ -83,7 +83,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
@@ -100,7 +100,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId), Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
         assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS)
@@ -127,8 +127,12 @@ class ReplicaManagerTest {
     val logProps = new Properties()
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+    val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
+    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+    EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache)
 
     try {
       var produceCallbackFired = false
@@ -145,11 +149,6 @@ class ReplicaManagerTest {
         fetchCallbackFired = true
       }
 
-      val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
-      val metadataCache = EasyMock.createMock(classOf[MetadataCache])
-      EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
-      EasyMock.replay(metadataCache)
-
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
 
@@ -159,7 +158,7 @@ class ReplicaManagerTest {
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       // Append a message.
@@ -178,14 +177,14 @@ class ReplicaManagerTest {
         fetchMinBytes = 100000,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
         responseCallback = fetchCallback)
 
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {})
+      rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => {})
 
       assertTrue(produceCallbackFired)
       assertTrue(fetchCallbackFired)
@@ -203,13 +202,16 @@ class ReplicaManagerTest {
     val logProps = new Properties()
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+    val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1), createBroker(1, "host2", 2))
+    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+    EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes()
+    EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
+    EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(2))).andReturn(true).anyTimes()
+    EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
     try {
-      val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1), createBroker(1, "host2", 2))
-      val metadataCache = EasyMock.createMock(classOf[MetadataCache])
-      EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
-      EasyMock.replay(metadataCache)
       
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
@@ -221,7 +223,7 @@ class ReplicaManagerTest {
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
-      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
@@ -251,7 +253,7 @@ class ReplicaManagerTest {
         fetchMinBytes = 0,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
         responseCallback = fetchCallback)
         
       
@@ -267,7 +269,7 @@ class ReplicaManagerTest {
         fetchMinBytes = 0,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
         responseCallback = fetchCallback)
           
         assertTrue(fetchCallbackFired)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index f33c73a..0129d5d 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -59,7 +59,7 @@ class SimpleFetchTest {
   val partitionId = 0
   val topicPartition = new TopicPartition(topic, partitionId)
 
-  val fetchInfo = Seq(topicPartition -> new PartitionData(0, fetchSize))
+  val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize))
 
   var replicaManager: ReplicaManager = null
 
@@ -75,6 +75,7 @@ class SimpleFetchTest {
 
     // create the log which takes read with either HW max offset or none max offset
     val log = EasyMock.createMock(classOf[Log])
+    EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
     EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn(
@@ -96,7 +97,7 @@ class SimpleFetchTest {
 
     // create the replica manager
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
 
     // add the partition with two replicas, both in ISR
     val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
@@ -111,7 +112,9 @@ class SimpleFetchTest {
     val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
     followerReplica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(leo, MemoryRecords.EMPTY),
                                                           hw = leo.messageOffset,
+                                                          leaderLogStartOffset = 0L,
                                                           leaderLogEndOffset = leo.messageOffset,
+                                                          followerLogStartOffset = 0L,
                                                           fetchTimeMs = time.milliseconds,
                                                           readSize = -1))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1ffc7c3..9ae7195 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -934,7 +934,8 @@ object TestUtils extends Logging {
                    cleanerConfig = cleanerConfig,
                    ioThreads = 4,
                    flushCheckMs = 1000L,
-                   flushCheckpointMs = 10000L,
+                   flushRecoveryOffsetCheckpointMs = 10000L,
+                   flushStartOffsetCheckpointMs = 10000L,
                    retentionCheckMs = 1000L,
                    scheduler = time.scheduler,
                    time = time,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ec29da7..109097c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -65,6 +65,12 @@
         producer's <code>batch.size</code> configuration.</li>
 </ul>
 
+<h5><a id="upgrade_1100_new_protocols" href="#upgrade_1100_new_protocols">New Protocol Versions</a></h5>
+<ul>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore()+API+in+AdminClient">KIP-107</a>: FetchRequest v5 introduces a partition-level <code>log_start_offset</code> field. </li>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore()+API+in+AdminClient">KIP-107</a>: FetchResponse v5 introduces a partition-level <code>log_start_offset</code> field. </li>
+</ul>
+
 <h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
 <p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
 However, please review the <a href="#upgrade_1020_notable">notable changes in 0.10.2.0</a> before upgrading.


[3/3] kafka git commit: KAFKA-4586; Add purgeDataBefore() API (KIP-107)

Posted by jq...@apache.org.
KAFKA-4586; Add purgeDataBefore() API (KIP-107)

Author: Dong Lin <li...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jiangjie Qin <be...@gmail.com>

Closes #2476 from lindong28/KAFKA-4586


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b05ad40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b05ad40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b05ad40

Branch: refs/heads/trunk
Commit: 8b05ad406d4cba6a75d1683b6d8699c3ab28f9d6
Parents: f3f9a9e
Author: Dong Lin <li...@gmail.com>
Authored: Tue Mar 28 09:59:44 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Tue Mar 28 09:59:44 2017 -0700

----------------------------------------------------------------------
 bin/kafka-delete-records.sh                     |  17 ++
 .../clients/consumer/internals/Fetcher.java     |   4 +-
 .../consumer/internals/RequestFuture.java       |  36 ++--
 .../kafka/common/network/DualSocketChannel.java |   4 +
 .../apache/kafka/common/protocol/ApiKeys.java   |   3 +-
 .../apache/kafka/common/protocol/Protocol.java  | 112 +++++++++++-
 .../apache/kafka/common/record/FileRecords.java |  15 +-
 .../kafka/common/requests/AbstractRequest.java  |   3 +
 .../kafka/common/requests/AbstractResponse.java |   2 +
 .../common/requests/DeleteRecordsRequest.java   | 151 +++++++++++++++
 .../common/requests/DeleteRecordsResponse.java  | 135 ++++++++++++++
 .../kafka/common/requests/FetchRequest.java     |  30 +--
 .../kafka/common/requests/FetchResponse.java    |  35 ++--
 .../clients/consumer/KafkaConsumerTest.java     |   2 +-
 .../clients/consumer/internals/FetcherTest.java |   4 +-
 .../common/requests/RequestResponseTest.java    |  16 +-
 .../main/scala/kafka/admin/AdminClient.scala    | 183 +++++++++++++++++--
 .../kafka/admin/BrokerApiVersionsCommand.scala  |   1 +
 .../kafka/admin/DeleteRecordsCommand.scala      | 117 ++++++++++++
 core/src/main/scala/kafka/api/ApiVersion.scala  |  10 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   2 +-
 .../main/scala/kafka/cluster/Partition.scala    |  58 +++++-
 core/src/main/scala/kafka/cluster/Replica.scala |  34 +++-
 .../kafka/consumer/ConsumerFetcherThread.scala  |   3 +-
 core/src/main/scala/kafka/log/Log.scala         |  98 +++++++---
 core/src/main/scala/kafka/log/LogManager.scala  |  56 +++++-
 core/src/main/scala/kafka/log/LogSegment.scala  |  33 ++--
 .../kafka/server/AbstractFetcherThread.scala    |  26 +--
 .../kafka/server/DelayedDeleteRecords.scala     | 129 +++++++++++++
 .../main/scala/kafka/server/DelayedFetch.scala  |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  67 ++++++-
 .../main/scala/kafka/server/KafkaConfig.scala   |  10 +
 .../main/scala/kafka/server/KafkaServer.scala   |   5 +-
 .../main/scala/kafka/server/MetadataCache.scala |   6 +
 .../kafka/server/ReplicaFetcherThread.scala     |  15 +-
 .../scala/kafka/server/ReplicaManager.scala     | 155 ++++++++++++++--
 .../integration/kafka/api/AdminClientTest.scala | 179 +++++++++++++++---
 .../kafka/api/AuthorizerIntegrationTest.scala   |   2 +-
 .../kafka/api/ConsumerBounceTest.scala          |   1 +
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   4 +-
 .../test/scala/other/kafka/StressTestLog.scala  |   1 +
 .../other/kafka/TestLinearWriteSpeed.scala      |   2 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   1 +
 .../log/LogCleanerLagIntegrationTest.scala      |   1 +
 .../unit/kafka/log/LogCleanerManagerTest.scala  |   3 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |   2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  97 +++++++---
 .../server/AbstractFetcherThreadTest.scala      |   4 +-
 .../unit/kafka/server/FetchRequestTest.scala    |   2 +-
 .../server/HighwatermarkPersistenceTest.scala   |   6 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |  12 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +
 .../scala/unit/kafka/server/LogOffsetTest.scala |  40 +++-
 .../kafka/server/ReplicaManagerQuotasTest.scala |   5 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  42 +++--
 .../unit/kafka/server/SimpleFetchTest.scala     |   7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 docs/upgrade.html                               |   6 +
 59 files changed, 1732 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/bin/kafka-delete-records.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-delete-records.sh b/bin/kafka-delete-records.sh
new file mode 100755
index 0000000..8726f91
--- /dev/null
+++ b/bin/kafka-delete-records.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7236653..c2456cc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -200,7 +200,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
 
                             for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                 TopicPartition partition = entry.getKey();
-                                long fetchOffset = request.fetchData().get(partition).offset;
+                                long fetchOffset = request.fetchData().get(partition).fetchOffset;
                                 FetchResponse.PartitionData fetchData = entry.getValue();
                                 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                         resp.requestHeader().apiVersion()));
@@ -722,7 +722,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                 }
 
                 long position = this.subscriptions.position(partition);
-                fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
+                fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize));
                 log.trace("Added fetch request for partition {} at offset {} to node {}", partition, position, node);
             } else {
                 log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 2b7c8f3..8515c95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -18,7 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.protocol.Errors;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -46,6 +47,7 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
     private static final Object INCOMPLETE_SENTINEL = new Object();
     private final AtomicReference<Object> result = new AtomicReference<>(INCOMPLETE_SENTINEL);
     private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners = new ConcurrentLinkedQueue<>();
+    private final CountDownLatch completedLatch = new CountDownLatch(1);
 
     /**
      * Check whether the response is ready to be handled
@@ -55,6 +57,10 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
         return result.get() != INCOMPLETE_SENTINEL;
     }
 
+    public boolean awaitDone(long timeout, TimeUnit unit) throws InterruptedException {
+        return completedLatch.await(timeout, unit);
+    }
+
     /**
      * Get the value corresponding to this request (only available if the request succeeded)
      * @return the value set in {@link #complete(Object)}
@@ -112,12 +118,16 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
      * @throws IllegalArgumentException if the argument is an instance of {@link RuntimeException}
      */
     public void complete(T value) {
-        if (value instanceof RuntimeException)
-            throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
+        try {
+            if (value instanceof RuntimeException)
+                throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
 
-        if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
-            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
-        fireSuccess();
+            if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
+                throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
+            fireSuccess();
+        } finally {
+            completedLatch.countDown();
+        }
     }
 
     /**
@@ -127,13 +137,17 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
      * @throws IllegalStateException if the future has already been completed
      */
     public void raise(RuntimeException e) {
-        if (e == null)
-            throw new IllegalArgumentException("The exception passed to raise must not be null");
+        try {
+            if (e == null)
+                throw new IllegalArgumentException("The exception passed to raise must not be null");
 
-        if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
-            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
+            if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
+                throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
 
-        fireFailure();
+            fireFailure();
+        } finally {
+            completedLatch.countDown();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java b/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
new file mode 100644
index 0000000..411dd50
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
@@ -0,0 +1,4 @@
+package org.apache.kafka.common.network;
+
+public class DualSocketChannel {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b3c59a1..89b2000 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -45,7 +45,8 @@ public enum ApiKeys {
     SASL_HANDSHAKE(17, "SaslHandshake"),
     API_VERSIONS(18, "ApiVersions"),
     CREATE_TOPICS(19, "CreateTopics"),
-    DELETE_TOPICS(20, "DeleteTopics");
+    DELETE_TOPICS(20, "DeleteTopics"),
+    DELETE_RECORDS(21, "DeleteRecords");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 8c3e08c..5d7004a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -492,11 +492,31 @@ public class Protocol {
                                                                                  INT32,
                                                                                  "Maximum bytes to fetch."));
 
+    // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+    public static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema(new Field("partition",
+                                                                                 INT32,
+                                                                                 "Topic partition id."),
+                                                                       new Field("fetch_offset",
+                                                                                 INT64,
+                                                                                 "Message offset."),
+                                                                       new Field("log_start_offset",
+                                                                                 INT64,
+                                                                                 "Earliest available offset of the follower replica. " +
+                                                                                 "The field is only used when request is sent by follower. "),
+                                                                       new Field("max_bytes",
+                                                                                 INT32,
+                                                                                 "Maximum bytes to fetch."));
+
     public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
                                                                    new Field("partitions",
                                                                              new ArrayOf(FETCH_REQUEST_PARTITION_V0),
                                                                              "Partitions to fetch."));
 
+    public static final Schema FETCH_REQUEST_TOPIC_V5 = new Schema(new Field("topic", STRING, "Topic to fetch."),
+                                                                   new Field("partitions",
+                                                                             new ArrayOf(FETCH_REQUEST_PARTITION_V5),
+                                                                             "Partitions to fetch."));
+
     public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
                                                                        INT32,
                                                                        "Broker id of the follower. For normal consumers, use -1."),
@@ -565,6 +585,34 @@ public class Protocol {
                     new ArrayOf(FETCH_REQUEST_TOPIC_V0),
                     "Topics to fetch in the order provided."));
 
+    // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed.
+    public static final Schema FETCH_REQUEST_V5 = new Schema(
+            new Field("replica_id",
+                    INT32,
+                    "Broker id of the follower. For normal consumers, use -1."),
+            new Field("max_wait_time",
+                    INT32,
+                    "Maximum time in ms to wait for the response."),
+            new Field("min_bytes",
+                    INT32,
+                    "Minimum bytes to accumulate in the response."),
+            new Field("max_bytes",
+                    INT32,
+                    "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+                    "if the first message in the first non-empty partition of the fetch is larger than this " +
+                    "value, the message will still be returned to ensure that progress can be made."),
+            new Field("isolation_level",
+                    INT8,
+                    "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+                    "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+                     "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+                     "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+                     "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+                     "consumers to discard ABORTED transactional records"),
+            new Field("topics",
+                    new ArrayOf(FETCH_REQUEST_TOPIC_V5),
+                    "Topics to fetch in the order provided."));
+
     public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
                                                                                          INT32,
                                                                                          "Topic partition id."),
@@ -602,6 +650,8 @@ public class Protocol {
             new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
             new Field("first_offset", INT64, "The first offset in the aborted transaction"));
 
+    public static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
+
     public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
             new Field("partition",
                     INT32,
@@ -617,14 +667,41 @@ public class Protocol {
             new Field("aborted_transactions",
                     ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
 
+    // FETCH_RESPONSE_PARTITION_HEADER_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+    public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V5 = new Schema(
+            new Field("partition",
+                    INT32,
+                    "Topic partition id."),
+            new Field("error_code", INT16),
+            new Field("high_watermark",
+                    INT64,
+                    "Last committed offset."),
+            new Field("last_stable_offset",
+                    INT64,
+                    "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
+                    "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
+            new Field("log_start_offset",
+                    INT64,
+                    "Earliest available offset."),
+            new Field("aborted_transactions",
+                    ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V5)));
+
     public static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
             new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V4),
             new Field("record_set", RECORDS));
 
+    public static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema(
+            new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V5),
+            new Field("record_set", RECORDS));
+
     public static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
             new Field("topic", STRING),
             new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
 
+    public static final Schema FETCH_RESPONSE_TOPIC_V5 = new Schema(
+            new Field("topic", STRING),
+            new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
+
     public static final Schema FETCH_RESPONSE_V4 = new Schema(
             new Field("throttle_time_ms",
                     INT32,
@@ -633,8 +710,16 @@ public class Protocol {
                     0),
             new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
 
-    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4};
+    public static final Schema FETCH_RESPONSE_V5 = new Schema(
+            new Field("throttle_time_ms",
+                    INT32,
+                    "Duration in milliseconds for which the request was throttled " +
+                    "due to quota violation (zero if the request did not violate any quota).",
+                    0),
+            new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
+
+    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5};
 
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
@@ -1070,6 +1155,27 @@ public class Protocol {
     public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0};
     public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0};
 
+    public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
+                                                                                new Field("offset", INT64, "The offset before which the messages will be deleted."));
+
+    public static final Schema DELETE_RECORDS_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+                                                                            new Field("partitions", new ArrayOf(DELETE_RECORDS_REQUEST_PARTITION_V0)));
+
+    public static final Schema DELETE_RECORDS_REQUEST_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_REQUEST_TOPIC_V0)),
+                                                                      new Field("timeout", INT32, "The maximum time to await a response in ms."));
+
+    public static final Schema DELETE_RECORDS_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
+                                                                                 new Field("low_watermark", INT64, "Smallest available offset of all live replicas"),
+                                                                                 new Field("error_code", INT16, "The error code for the given partition."));
+
+    public static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+                                                                             new Field("partitions", new ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0)));
+
+    public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
+    public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1101,6 +1207,7 @@ public class Protocol {
         REQUESTS[ApiKeys.API_VERSIONS.id] = API_VERSIONS_REQUEST;
         REQUESTS[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_REQUEST;
         REQUESTS[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_REQUEST;
+        REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1123,6 +1230,7 @@ public class Protocol {
         RESPONSES[ApiKeys.API_VERSIONS.id] = API_VERSIONS_RESPONSE;
         RESPONSES[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_RESPONSE;
         RESPONSES[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_RESPONSE;
+        RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index b0dcebf..dcd7845 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -286,24 +286,25 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     /**
-     * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
+     * Search forward for the first message that meets the following requirements:
+     * - Message's timestamp is greater than or equals to the targetTimestamp.
+     * - Message's position in the log file is greater than or equals to the startingPosition.
+     * - Message's offset is greater than or equals to the startingOffset.
      *
      * @param targetTimestamp The timestamp to search for.
      * @param startingPosition The starting position to search.
-     * @return The timestamp and offset of the message found. None, if no message is found.
+     * @param startingOffset The starting offset to search.
+     * @return The timestamp and offset of the message found. Null if no message is found.
      */
-    public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
+    public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition, long startingOffset) {
         for (RecordBatch batch : batchesFrom(startingPosition)) {
             if (batch.maxTimestamp() >= targetTimestamp) {
                 // We found a message
                 for (Record record : batch) {
                     long timestamp = record.timestamp();
-                    if (timestamp >= targetTimestamp)
+                    if (timestamp >= targetTimestamp && record.offset() >= startingOffset)
                         return new TimestampAndOffset(timestamp, record.offset());
                 }
-                throw new IllegalStateException(String.format("The message set (max timestamp = %s, max offset = %s)" +
-                        " should contain target timestamp %s but it does not.", batch.maxTimestamp(),
-                        batch.lastOffset(), targetTimestamp));
             }
         }
         return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 7dc3b62..3a99a8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -168,6 +168,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case DELETE_TOPICS:
                 request = new DeleteTopicsRequest(struct, version);
                 break;
+            case DELETE_RECORDS:
+                request = new DeleteRecordsRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index d534daf..a5d0dc4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -91,6 +91,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new CreateTopicsResponse(struct);
             case DELETE_TOPICS:
                 return new DeleteTopicsResponse(struct);
+            case DELETE_RECORDS:
+                return new DeleteRecordsResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
new file mode 100644
index 0000000..f204c44
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteRecordsRequest extends AbstractRequest {
+
+    public static final long HIGH_WATERMARK = -1L;
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String TIMEOUT_KEY_NAME = "timeout";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level key names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String OFFSET_KEY_NAME = "offset";
+
+    private final int timeout;
+    private final Map<TopicPartition, Long> partitionOffsets;
+
+    public static class Builder extends AbstractRequest.Builder<DeleteRecordsRequest> {
+        private final int timeout;
+        private final Map<TopicPartition, Long> partitionOffsets;
+
+        public Builder(int timeout, Map<TopicPartition, Long> partitionOffsets) {
+            super(ApiKeys.DELETE_RECORDS);
+            this.timeout = timeout;
+            this.partitionOffsets = partitionOffsets;
+        }
+
+        @Override
+        public DeleteRecordsRequest build(short version) {
+            return new DeleteRecordsRequest(timeout, partitionOffsets, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(type=DeleteRecordsRequest")
+                   .append(", timeout=").append(timeout)
+                   .append(", partitionOffsets=(").append(Utils.mkString(partitionOffsets))
+                   .append("))");
+            return builder.toString();
+        }
+    }
+
+
+    public DeleteRecordsRequest(Struct struct, short version) {
+        super(version);
+        partitionOffsets = new HashMap<>();
+        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicStruct = (Struct) topicStructObj;
+            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionStructObj;
+                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                long offset = partitionStruct.getLong(OFFSET_KEY_NAME);
+                partitionOffsets.put(new TopicPartition(topic, partition), offset);
+            }
+        }
+        timeout = struct.getInt(TIMEOUT_KEY_NAME);
+    }
+
+    public DeleteRecordsRequest(int timeout, Map<TopicPartition, Long> partitionOffsets, short version) {
+        super(version);
+        this.timeout = timeout;
+        this.partitionOffsets = partitionOffsets;
+    }
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.DELETE_RECORDS.requestSchema(version()));
+        Map<String, Map<Integer, Long>> offsetsByTopic = CollectionUtils.groupDataByTopic(partitionOffsets);
+        struct.set(TIMEOUT_KEY_NAME, timeout);
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, Long>> offsetsByTopicEntry : offsetsByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_KEY_NAME, offsetsByTopicEntry.getKey());
+            List<Struct> partitionStructArray = new ArrayList<>();
+            for (Map.Entry<Integer, Long> offsetsByPartitionEntry : offsetsByTopicEntry.getValue().entrySet()) {
+                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                partitionStruct.set(PARTITION_KEY_NAME, offsetsByPartitionEntry.getKey());
+                partitionStruct.set(OFFSET_KEY_NAME, offsetsByPartitionEntry.getValue());
+                partitionStructArray.add(partitionStruct);
+            }
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(Throwable e) {
+        Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> responseMap = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            responseMap.put(entry.getKey(), new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.forException(e)));
+        }
+
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new DeleteRecordsResponse(responseMap);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion()));
+        }
+    }
+
+    public int timeout() {
+        return timeout;
+    }
+
+    public Map<TopicPartition, Long> partitionOffsets() {
+        return partitionOffsets;
+    }
+
+    public static DeleteRecordsRequest parse(ByteBuffer buffer, short version) {
+        return new DeleteRecordsRequest(ApiKeys.DELETE_RECORDS.parseRequest(version, buffer), version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
new file mode 100644
index 0000000..45b518b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteRecordsResponse extends AbstractResponse {
+
+    public static final long INVALID_LOW_WATERMARK = -1L;
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level key names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String LOW_WATERMARK_KEY_NAME = "low_watermark";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    private final Map<TopicPartition, PartitionResponse> responses;
+
+    /**
+     * Possible error code:
+     *
+     * OFFSET_OUT_OF_RANGE (1)
+     * UNKNOWN_TOPIC_OR_PARTITION (3)
+     * NOT_LEADER_FOR_PARTITION (6)
+     * REQUEST_TIMED_OUT (7)
+     * NOT_ENOUGH_REPLICAS (19)
+     * UNKNOWN (-1)
+     */
+
+    public static final class PartitionResponse {
+        public long lowWatermark;
+        public Errors error;
+
+        public PartitionResponse(long lowWatermark, Errors error) {
+            this.lowWatermark = lowWatermark;
+            this.error = error;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append('{')
+                   .append(",low_watermark: ")
+                   .append(lowWatermark)
+                   .append("error: ")
+                   .append(error.toString())
+                   .append('}');
+            return builder.toString();
+        }
+    }
+
+    public DeleteRecordsResponse(Struct struct) {
+        responses = new HashMap<>();
+        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicStruct = (Struct) topicStructObj;
+            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionStructObj;
+                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                long lowWatermark = partitionStruct.getLong(LOW_WATERMARK_KEY_NAME);
+                Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                responses.put(new TopicPartition(topic, partition), new PartitionResponse(lowWatermark, error));
+            }
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     */
+    public DeleteRecordsResponse(Map<TopicPartition, PartitionResponse> responses) {
+        this.responses = responses;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.DELETE_RECORDS.responseSchema(version));
+        Map<String, Map<Integer, PartitionResponse>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, PartitionResponse>> responsesByTopicEntry : responsesByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey());
+            List<Struct> partitionStructArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionResponse> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
+                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                PartitionResponse response = responsesByPartitionEntry.getValue();
+                partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey());
+                partitionStruct.set(LOW_WATERMARK_KEY_NAME, response.lowWatermark);
+                partitionStruct.set(ERROR_CODE_KEY_NAME, response.error.code());
+                partitionStructArray.add(partitionStruct);
+            }
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+        return struct;
+    }
+
+    public Map<TopicPartition, PartitionResponse> responses() {
+        return this.responses;
+    }
+
+    public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) {
+        return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 7c029ca..b843c66 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -47,9 +47,11 @@ public class FetchRequest extends AbstractRequest {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+    private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
 
     // default values for older versions where a request level limit did not exist
     public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
+    public static final long INVALID_LOG_START_OFFSET = -1L;
 
     private final int replicaId;
     private final int maxWait;
@@ -59,17 +61,19 @@ public class FetchRequest extends AbstractRequest {
     private final LinkedHashMap<TopicPartition, PartitionData> fetchData;
 
     public static final class PartitionData {
-        public final long offset;
+        public final long fetchOffset;
+        public final long logStartOffset;
         public final int maxBytes;
 
-        public PartitionData(long offset, int maxBytes) {
-            this.offset = offset;
+        public PartitionData(long fetchOffset, long logStartOffset, int maxBytes) {
+            this.fetchOffset = fetchOffset;
+            this.logStartOffset = logStartOffset;
             this.maxBytes = maxBytes;
         }
 
         @Override
         public String toString() {
-            return "(offset=" + offset + ", maxBytes=" + maxBytes + ")";
+            return "(offset=" + fetchOffset + ", logStartOffset=" + logStartOffset + ", maxBytes=" + maxBytes + ")";
         }
     }
 
@@ -188,7 +192,9 @@ public class FetchRequest extends AbstractRequest {
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                 long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
                 int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
-                PartitionData partitionData = new PartitionData(offset, maxBytes);
+                long logStartOffset = partitionResponse.hasField(LOG_START_OFFSET_KEY_NAME) ?
+                    partitionResponse.getLong(LOG_START_OFFSET_KEY_NAME) : INVALID_LOG_START_OFFSET;
+                PartitionData partitionData = new PartitionData(offset, logStartOffset, maxBytes);
                 fetchData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
@@ -200,7 +206,8 @@ public class FetchRequest extends AbstractRequest {
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e),
-                    FetchResponse.INVALID_LSO, FetchResponse.INVALID_HIGHWATERMARK, null, MemoryRecords.EMPTY);
+                FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET,
+                null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
         return new FetchResponse(responseData, 0);
@@ -240,16 +247,15 @@ public class FetchRequest extends AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        short version = version();
-        Struct struct = new Struct(ApiKeys.FETCH.requestSchema(version));
+        Struct struct = new Struct(ApiKeys.FETCH.requestSchema(version()));
         List<TopicAndPartitionData<PartitionData>> topicsData = TopicAndPartitionData.batchByTopic(fetchData);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
         struct.set(MAX_WAIT_KEY_NAME, maxWait);
         struct.set(MIN_BYTES_KEY_NAME, minBytes);
-        if (version >= 3)
+        if (struct.hasField(MAX_BYTES_KEY_NAME))
             struct.set(MAX_BYTES_KEY_NAME, maxBytes);
-        if (version >= 4)
+        if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
             struct.set(ISOLATION_LEVEL_KEY_NAME, IsolationLevel.READ_UNCOMMITTED.id());
 
         List<Struct> topicArray = new ArrayList<>();
@@ -261,7 +267,9 @@ public class FetchRequest extends AbstractRequest {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
+                partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.fetchOffset);
+                if (partitionData.hasField(LOG_START_OFFSET_KEY_NAME))
+                    partitionData.set(LOG_START_OFFSET_KEY_NAME, fetchPartitionData.logStartOffset);
                 partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
                 partitionArray.add(partitionData);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f0a0eee..56eb838 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -51,6 +51,7 @@ public class FetchResponse extends AbstractResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
     private static final String LAST_STABLE_OFFSET_KEY_NAME = "last_stable_offset";
+    private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
     private static final String ABORTED_TRANSACTIONS_KEY_NAME = "aborted_transactions";
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
@@ -60,7 +61,8 @@ public class FetchResponse extends AbstractResponse {
 
     private static final int DEFAULT_THROTTLE_TIME = 0;
     public static final long INVALID_HIGHWATERMARK = -1L;
-    public static final long INVALID_LSO = -1L;
+    public static final long INVALID_LAST_STABLE_OFFSET = -1L;
+    public static final long INVALID_LOG_START_OFFSET = -1L;
 
     /**
      * Possible error codes:
@@ -111,19 +113,22 @@ public class FetchResponse extends AbstractResponse {
 
     public static final class PartitionData {
         public final Errors error;
-        public final long lastStableOffset;
         public final long highWatermark;
+        public final long lastStableOffset;
+        public final long logStartOffset;
         public final List<AbortedTransaction> abortedTransactions;
         public final Records records;
 
         public PartitionData(Errors error,
                              long highWatermark,
                              long lastStableOffset,
+                             long logStartOffset,
                              List<AbortedTransaction> abortedTransactions,
                              Records records) {
             this.error = error;
             this.highWatermark = highWatermark;
             this.lastStableOffset = lastStableOffset;
+            this.logStartOffset = logStartOffset;
             this.abortedTransactions = abortedTransactions;
             this.records = records;
         }
@@ -140,6 +145,7 @@ public class FetchResponse extends AbstractResponse {
             return error == that.error &&
                     highWatermark == that.highWatermark &&
                     lastStableOffset == that.lastStableOffset &&
+                    logStartOffset == that.logStartOffset &&
                     (abortedTransactions == null ? that.abortedTransactions == null : abortedTransactions.equals(that.abortedTransactions)) &&
                     (records == null ? that.records == null : records.equals(that.records));
         }
@@ -147,8 +153,9 @@ public class FetchResponse extends AbstractResponse {
         @Override
         public int hashCode() {
             int result = error != null ? error.hashCode() : 0;
-            result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32));
             result = 31 * result + (int) (highWatermark ^ (highWatermark >>> 32));
+            result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32));
+            result = 31 * result + (int) (logStartOffset ^ (logStartOffset >>> 32));
             result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
             result = 31 * result + (records != null ? records.hashCode() : 0);
             return result;
@@ -157,15 +164,16 @@ public class FetchResponse extends AbstractResponse {
         @Override
         public String toString() {
             return "(error=" + error + ", highWaterMark=" + highWatermark +
-                    ", lastStableOffset = " + lastStableOffset + ", " +
-                    "abortedTransactions = " + abortedTransactions + ", records=" + records + ")";
+                    ", lastStableOffset = " + lastStableOffset +
+                    ", logStartOffset = " + logStartOffset +
+                    ", abortedTransactions = " + abortedTransactions + ", records=" + records + ")";
         }
     }
 
     /**
      * Constructor for all versions.
      *
-     * From version 3, the entries in `responseData` should be in the same order as the entries in
+     * From version 3 or later, the entries in `responseData` should be in the same order as the entries in
      * `FetchRequest.fetchData`.
      *
      * @param responseData fetched data grouped by topic-partition
@@ -187,10 +195,13 @@ public class FetchResponse extends AbstractResponse {
                 int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
                 Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
                 long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
-                long lastStableOffset = INVALID_LSO;
+                long lastStableOffset = INVALID_LAST_STABLE_OFFSET;
                 if (partitionResponseHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME))
                     lastStableOffset = partitionResponseHeader.getLong(LAST_STABLE_OFFSET_KEY_NAME);
-
+                long logStartOffset = INVALID_LOG_START_OFFSET;
+                if (partitionResponseHeader.hasField(LOG_START_OFFSET_KEY_NAME))
+                    logStartOffset = partitionResponseHeader.getLong(LOG_START_OFFSET_KEY_NAME);
+                
                 Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
 
                 List<AbortedTransaction> abortedTransactions = null;
@@ -207,7 +218,7 @@ public class FetchResponse extends AbstractResponse {
                     }
                 }
 
-                PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset,
+                PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset, logStartOffset,
                         abortedTransactions, records);
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
@@ -319,7 +330,7 @@ public class FetchResponse extends AbstractResponse {
                 partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code());
                 partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
 
-                if (version >= 4) {
+                if (partitionDataHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME)) {
                     partitionDataHeader.set(LAST_STABLE_OFFSET_KEY_NAME, fetchPartitionData.lastStableOffset);
 
                     if (fetchPartitionData.abortedTransactions == null) {
@@ -335,6 +346,8 @@ public class FetchResponse extends AbstractResponse {
                         partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, abortedTransactionStructs.toArray());
                     }
                 }
+                if (partitionDataHeader.hasField(LOG_START_OFFSET_KEY_NAME))
+                    partitionDataHeader.set(LOG_START_OFFSET_KEY_NAME, fetchPartitionData.logStartOffset);
 
                 partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
                 partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
@@ -345,7 +358,7 @@ public class FetchResponse extends AbstractResponse {
         }
         struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
 
-        if (version >= 1)
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
             struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
 
         return struct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7f20472..3dd3983 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1467,7 +1467,7 @@ public class KafkaConsumerTest {
                     builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
                 records = builder.build();
             }
-            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LSO,
+            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
                     null, records));
         }
         return new FetchResponse(tpResponses, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 92150a6..224f83c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -222,7 +222,7 @@ public class FetcherTest {
             public boolean matches(AbstractRequest body) {
                 FetchRequest fetch = (FetchRequest) body;
                 return fetch.fetchData().containsKey(tp) &&
-                        fetch.fetchData().get(tp).offset == offset;
+                        fetch.fetchData().get(tp).fetchOffset == offset;
             }
         };
     }
@@ -966,7 +966,7 @@ public class FetcherTest {
 
     private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) {
         Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
-                new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LSO, null, records));
+                new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
         return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8a6d69a..ad7260e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -348,7 +348,7 @@ public class RequestResponseTest {
 
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000,
-                FetchResponse.INVALID_LSO, null, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
 
         FetchResponse v0Response = new FetchResponse(responseData, 0);
         FetchResponse v1Response = new FetchResponse(responseData, 10);
@@ -373,11 +373,11 @@ public class RequestResponseTest {
                 new FetchResponse.AbortedTransaction(15, 50)
         );
         responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(Errors.NONE, 100000,
-                FetchResponse.INVALID_LSO, abortedTransactions, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, abortedTransactions, records));
         responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData(Errors.NONE, 900000,
-                5, null, records));
+                5, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
         responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(Errors.NONE, 70000,
-                6, Collections.<FetchResponse.AbortedTransaction>emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, Collections.<FetchResponse.AbortedTransaction>emptyList(), records));
 
         FetchResponse response = new FetchResponse(responseData, 10);
         FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -458,8 +458,8 @@ public class RequestResponseTest {
 
     private FetchRequest createFetchRequest(int version) {
         LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
-        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
-        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
+        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L, 1000000));
+        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 0L, 1000000));
         return FetchRequest.Builder.forConsumer(100, 100000, fetchData).setMaxBytes(1000).build((short) version);
     }
 
@@ -467,12 +467,12 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE,
-                1000000, FetchResponse.INVALID_LSO, null, records));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
                 new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData(Errors.NONE,
-                1000000, FetchResponse.INVALID_LSO, abortedTransactions, MemoryRecords.EMPTY));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, abortedTransactions, MemoryRecords.EMPTY));
 
         return new FetchResponse(responseData, 25);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 45ba58b..8a9660b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -12,16 +12,18 @@
  */
 package kafka.admin
 
+import java.io.IOException
 import java.nio.ByteBuffer
 import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
-
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, Future}
+import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.KafkaException
 import kafka.coordinator.GroupOverview
 import kafka.utils.Logging
 
 import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
+import org.apache.kafka.clients.consumer.internals.{RequestFutureAdapter, ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.TimeoutException
@@ -32,7 +34,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
@@ -40,18 +42,44 @@ import scala.util.Try
 
 class AdminClient(val time: Time,
                   val requestTimeoutMs: Int,
-                  val retryBackoffMs: Int,
+                  val retryBackoffMs: Long,
                   val client: ConsumerNetworkClient,
                   val bootstrapBrokers: List[Node]) extends Logging {
 
+  @volatile var running: Boolean = true
+  val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
+
+  val networkThread = new KafkaThread("admin-client-network-thread", new Runnable {
+    override def run() {
+      try {
+        while (running) {
+          client.poll(Long.MaxValue)
+        }
+      } catch {
+        case t : Throwable =>
+          error("admin-client-network-thread exited", t)
+      } finally {
+        pendingFutures.asScala.foreach { future =>
+          try {
+            future.raise(Errors.UNKNOWN)
+          } catch {
+            case _: IllegalStateException => // It is OK if the future has been completed
+          }
+        }
+        pendingFutures.clear()
+      }
+    }
+  }, true)
+
+  networkThread.start()
+
   private def send(target: Node,
                    api: ApiKeys,
                    request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
-    var future: RequestFuture[ClientResponse] = null
-
-    future = client.send(target, request)
-    client.poll(future)
-
+    val future: RequestFuture[ClientResponse] = client.send(target, request)
+    pendingFutures.add(future)
+    future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
+    pendingFutures.remove(future)
     if (future.succeeded())
       future.value().responseBody()
     else
@@ -163,6 +191,73 @@ class AdminClient(val time: Time,
       broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
     }.toMap
 
+  /*
+   * Remove all the messages whose offset is smaller than the given offset of the corresponding partition
+   *
+   * DeleteRecordsResult contains either lowWatermark of the partition or exception. We list the possible exception
+   * and their interpretations below:
+   *
+   * - DisconnectException if leader node of the partition is not available. Need retry by user.
+   * - PolicyViolationException if the topic is configured as non-deletable.
+   * - TopicAuthorizationException if the topic doesn't exist and the user doesn't have the authority to create the topic
+   * - TimeoutException if response is not available within the timeout specified by either Future's timeout or AdminClient's request timeout
+   * - UnknownTopicOrPartitionException if the partition doesn't exist or if the user doesn't have the authority to describe the topic
+   * - NotLeaderForPartitionException if broker is not leader of the partition. Need retry by user.
+   * - OffsetOutOfRangeException if the offset is larger than high watermark of this partition
+   *
+   */
+
+  def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = {
+    val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic()).toSet.toList.asJava)
+    val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse]
+    val errors = response.errors
+    if (!errors.isEmpty)
+      error(s"Metadata request contained errors: $errors")
+
+    val (partitionsWithoutError, partitionsWithError) = offsets.partition{ partitionAndOffset =>
+      !response.errors().containsKey(partitionAndOffset._1.topic())}
+
+    val (partitionsWithLeader, partitionsWithoutLeader) = partitionsWithoutError.partition{ partitionAndOffset =>
+      response.cluster().leaderFor(partitionAndOffset._1) != null}
+
+    val partitionsWithErrorResults = partitionsWithError.keys.map( partition =>
+      partition -> DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, response.errors().get(partition.topic()).exception())).toMap
+
+    val partitionsWithoutLeaderResults = partitionsWithoutLeader.mapValues( _ =>
+      DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.LEADER_NOT_AVAILABLE.exception()))
+
+    val partitionsGroupByLeader = partitionsWithLeader.groupBy(partitionAndOffset =>
+      response.cluster().leaderFor(partitionAndOffset._1))
+
+    // prepare requests and generate Future objects
+    val futures = partitionsGroupByLeader.map{ case (node, partitionAndOffsets) =>
+      val convertedMap: java.util.Map[TopicPartition, java.lang.Long] = partitionAndOffsets.mapValues(_.asInstanceOf[java.lang.Long]).asJava
+      val future = client.send(node, new DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap))
+      pendingFutures.add(future)
+      future.compose(new RequestFutureAdapter[ClientResponse, Map[TopicPartition, DeleteRecordsResult]]() {
+          override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
+            val deleteRecordsResponse = response.responseBody().asInstanceOf[DeleteRecordsResponse]
+            val result = deleteRecordsResponse.responses().asScala.mapValues(v => DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap
+            future.complete(result)
+            pendingFutures.remove(future)
+          }
+
+          override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
+            val result = partitionAndOffsets.mapValues(_ => DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e))
+            future.complete(result)
+            pendingFutures.remove(future)
+          }
+
+        })
+    }
+
+    // default output if not receiving DeleteRecordsResponse before timeout
+    val defaultResults = offsets.mapValues(_ =>
+      DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.REQUEST_TIMED_OUT.exception())) ++ partitionsWithErrorResults ++ partitionsWithoutLeaderResults
+
+    new CompositeFuture(time, defaultResults, futures.toList)
+  }
+
   /**
    * Case class used to represent a consumer of a consumer group
    */
@@ -221,11 +316,53 @@ class AdminClient(val time: Time,
   }
 
   def close() {
-    client.close()
+    running = false
+    try {
+      client.close()
+    } catch {
+      case e: IOException =>
+        error("Exception closing nioSelector:", e)
+    }
   }
 
 }
 
+/*
+ * CompositeFuture assumes that the future object in the futures list does not raise error
+ */
+class CompositeFuture[T](time: Time,
+                         defaultResults: Map[TopicPartition, T],
+                         futures: List[RequestFuture[Map[TopicPartition, T]]]) extends Future[Map[TopicPartition, T]] {
+
+  override def isCancelled = false
+
+  override def cancel(interrupt: Boolean) = false
+
+  override def get(): Map[TopicPartition, T] = {
+    get(Long.MaxValue, TimeUnit.MILLISECONDS)
+  }
+
+  override def get(timeout: Long, unit: TimeUnit): Map[TopicPartition, T] = {
+    val start: Long = time.milliseconds()
+    val timeoutMs = unit.toMillis(timeout)
+    var remaining: Long = timeoutMs
+
+    val observedResults = futures.flatMap{ future =>
+      val elapsed = time.milliseconds() - start
+      remaining = if (timeoutMs - elapsed > 0) timeoutMs - elapsed else 0L
+
+      if (future.awaitDone(remaining, TimeUnit.MILLISECONDS)) future.value()
+      else Map.empty[TopicPartition, T]
+    }.toMap
+
+    defaultResults ++ observedResults
+  }
+
+  override def isDone: Boolean = {
+    futures.forall(_.isDone)
+  }
+}
+
 object AdminClient {
   val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
   val DefaultRequestTimeoutMs = 5000
@@ -249,11 +386,25 @@ object AdminClient {
         CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
         ConfigDef.Importance.MEDIUM,
         CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+      .define(
+        CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+        ConfigDef.Type.INT,
+        DefaultRequestTimeoutMs,
+        ConfigDef.Importance.MEDIUM,
+        CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+      .define(
+        CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+        ConfigDef.Type.LONG,
+        DefaultRetryBackoffMs,
+        ConfigDef.Importance.MEDIUM,
+        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
       .withClientSslSupport()
       .withClientSaslSupport()
     config
   }
 
+  case class DeleteRecordsResult(lowWatermark: Long, error: Exception)
+
   class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
 
   def createSimplePlaintext(brokerUrl: String): AdminClient = {
@@ -270,6 +421,8 @@ object AdminClient {
     val metrics = new Metrics(time)
     val metadata = new Metadata
     val channelBuilder = ClientUtils.createChannelBuilder(config)
+    val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
+    val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
 
     val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
     val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
@@ -291,7 +444,7 @@ object AdminClient {
       DefaultReconnectBackoffMs,
       DefaultSendBufferBytes,
       DefaultReceiveBufferBytes,
-      DefaultRequestTimeoutMs,
+      requestTimeoutMs,
       time,
       true,
       new ApiVersions)
@@ -300,13 +453,13 @@ object AdminClient {
       networkClient,
       metadata,
       time,
-      DefaultRetryBackoffMs,
-      DefaultRequestTimeoutMs)
+      retryBackoffMs,
+      requestTimeoutMs.toLong)
 
     new AdminClient(
       time,
-      DefaultRequestTimeoutMs,
-      DefaultRetryBackoffMs,
+      requestTimeoutMs,
+      retryBackoffMs,
       highLevelClient,
       bootstrapCluster.nodes.asScala.toList)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index ac94a7e..b87c856 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -48,6 +48,7 @@ object BrokerApiVersionsCommand {
         case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
       }
     }
+    adminClient.close()
   }
 
   private def createAdminClient(opts: BrokerVersionCommandOptions): AdminClient = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
new file mode 100644
index 0000000..b85a6ff
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.io.PrintStream
+import java.util.Properties
+
+import kafka.admin.AdminClient.DeleteRecordsResult
+import kafka.common.AdminCommandFailedException
+import kafka.utils.{CoreUtils, Json, CommandLineUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.CommonClientConfigs
+import joptsimple._
+
+import scala.util.{Failure, Success}
+
+/**
+ * A command for delete records of the given partitions down to the specified offset.
+ */
+object DeleteRecordsCommand {
+
+  def main(args: Array[String]): Unit = {
+    execute(args, System.out)
+  }
+
+  def parseOffsetJsonStringWithoutDedup(jsonData: String): Seq[(TopicPartition, Long)] = {
+    Json.parseFull(jsonData) match {
+      case Some(m) =>
+        m.asInstanceOf[Map[String, Any]].get("partitions") match {
+          case Some(partitionsSeq) =>
+            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
+              val topic = p.get("topic").get.asInstanceOf[String]
+              val partition = p.get("partition").get.asInstanceOf[Int]
+              val offset = p.get("offset").get.asInstanceOf[Int].toLong
+              new TopicPartition(topic, partition) -> offset
+            })
+          case None =>
+            Seq.empty
+        }
+      case None =>
+        Seq.empty
+    }
+  }
+
+  def execute(args: Array[String], out: PrintStream): Unit = {
+    val opts = new DeleteRecordsCommandOptions(args)
+    val adminClient = createAdminClient(opts)
+    val offsetJsonFile =  opts.options.valueOf(opts.offsetJsonFileOpt)
+    val offsetJsonString = Utils.readFileAsString(offsetJsonFile)
+    val offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString)
+
+    val duplicatePartitions = CoreUtils.duplicates(offsetSeq.map { case (tp, _) => tp })
+    if (duplicatePartitions.nonEmpty)
+      throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
+
+    out.println("Executing records delete operation")
+    val deleteRecordsResult: Map[TopicPartition, DeleteRecordsResult] = adminClient.deleteRecordsBefore(offsetSeq.toMap).get()
+    out.println("Records delete operation completed:")
+
+    deleteRecordsResult.foreach{ case (tp, partitionResult) => {
+      if (partitionResult.error == null)
+        out.println(s"partition: $tp\tlow_watermark: ${partitionResult.lowWatermark}")
+      else
+        out.println(s"partition: $tp\terror: ${partitionResult.error.toString}")
+    }}
+    adminClient.close()
+  }
+
+  private def createAdminClient(opts: DeleteRecordsCommandOptions): AdminClient = {
+    val props = if (opts.options.has(opts.commandConfigOpt))
+      Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+    else
+      new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+    AdminClient.create(props)
+  }
+
+  class DeleteRecordsCommandOptions(args: Array[String]) {
+    val BootstrapServerDoc = "REQUIRED: The server to connect to."
+    val offsetJsonFileDoc = "REQUIRED: The JSON file with offset per partition. The format to use is:\n" +
+                                 "{\"partitions\":\n  [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}"
+    val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
+
+    val parser = new OptionParser
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
+                                   .withRequiredArg
+                                   .describedAs("server(s) to use for bootstrapping")
+                                   .ofType(classOf[String])
+    val offsetJsonFileOpt = parser.accepts("offset-json-file", offsetJsonFileDoc)
+                                   .withRequiredArg
+                                   .describedAs("Offset json file path")
+                                   .ofType(classOf[String])
+    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+                                   .withRequiredArg
+                                   .describedAs("command config property file path")
+                                   .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+    CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 2ed6452..6101d2a 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -65,7 +65,9 @@ object ApiVersion {
     "0.10.2" -> KAFKA_0_10_2_IV0,
     // KIP-98 (idempotent and transactional producer support)
     "0.11.0-IV0" -> KAFKA_0_11_0_IV0,
-    "0.11.0" -> KAFKA_0_11_0_IV0
+    // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+    "0.11.0-IV1" -> KAFKA_0_11_0_IV1,
+    "0.11.0" -> KAFKA_0_11_0_IV1
   )
 
   private val versionPattern = "\\.".r
@@ -155,3 +157,9 @@ case object KAFKA_0_11_0_IV0 extends ApiVersion {
   val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
   val id: Int = 10
 }
+
+case object KAFKA_0_11_0_IV1 extends ApiVersion {
+  val version: String = "0.11.0-IV0"
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val id: Int = 10
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index f91a3c3..39da605 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -204,7 +204,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
     requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
       responseData.put(new TopicPartition(topic, partition),
         new JFetchResponse.PartitionData(Errors.forException(e), JFetchResponse.INVALID_HIGHWATERMARK,
-          JFetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+          JFetchResponse.INVALID_LAST_STABLE_OFFSET, JFetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
     val errorResponse = new JFetchResponse(responseData, 0)
     // Magic value does not matter here because the message set is empty


[2/3] kafka git commit: KAFKA-4586; Add purgeDataBefore() API (KIP-107)

Posted by jq...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c2d34d9..ddb2411 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,7 +28,7 @@ import kafka.controller.KafkaController
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException}
+import org.apache.kafka.common.errors.{PolicyViolationException, NotEnoughReplicasException, NotLeaderForPartitionException}
 import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.JavaConverters._
@@ -235,10 +235,20 @@ class Partition(val topic: String,
   def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) {
     getReplica(replicaId) match {
       case Some(replica) =>
+        // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
+        val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
         replica.updateLogReadResult(logReadResult)
+        val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
+        // check if the LW of the partition has incremented
+        // since the replica's logStartOffset may have incremented
+        val leaderLWIncremented = newLeaderLW > oldLeaderLW
         // check if we need to expand ISR to include this replica
         // if it is not in the ISR yet
-        maybeExpandIsr(replicaId, logReadResult)
+        val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)
+
+        // some delayed operations may be unblocked after HW or LW changed
+        if (leaderLWIncremented || leaderHWIncremented)
+          tryCompleteDelayedRequests()
 
         debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
           .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition))
@@ -263,8 +273,8 @@ class Partition(val topic: String,
    *
    * This function can be triggered when a replica's LEO has incremented
    */
-  def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) {
-    val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
+  def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
+    inWriteLock(leaderIsrUpdateLock) {
       // check if this replica needs to be added to the ISR
       leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
@@ -280,18 +290,12 @@ class Partition(val topic: String,
             updateIsr(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
           }
-
           // check if the HW of the partition can now be incremented
           // since the replica may already be in the ISR and its LEO has just incremented
           maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)
-
         case None => false // nothing to do if no longer leader
       }
     }
-
-    // some delayed operations may be unblocked after HW changed
-    if (leaderHWIncremented)
-      tryCompleteDelayedRequests()
   }
 
   /*
@@ -376,12 +380,25 @@ class Partition(val topic: String,
   }
 
   /**
+   * The low watermark offset value, calculated only if the local replica is the partition leader
+   * It is only used by leader broker to decide when DeleteRecordsRequest is satisfied. Its value is minimum logStartOffset of all live replicas
+   * Low watermark will increase when the leader broker receives either FetchRequest or DeleteRecordsRequest.
+   */
+  def lowWatermarkIfLeader: Long = {
+    if (!isLeaderReplicaLocal)
+      throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))
+    assignedReplicas.filter(replica =>
+      replicaManager.metadataCache.isBrokerAlive(replica.brokerId)).map(_.logStartOffset).reduceOption(_ min _).getOrElse(0L)
+  }
+
+  /**
    * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
    */
   private def tryCompleteDelayedRequests() {
     val requestKey = new TopicPartitionOperationKey(topicPartition)
     replicaManager.tryCompleteDelayedFetch(requestKey)
     replicaManager.tryCompleteDelayedProduce(requestKey)
+    replicaManager.tryCompleteDelayedDeleteRecords(requestKey)
   }
 
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
@@ -467,6 +484,27 @@ class Partition(val topic: String,
     info
   }
 
+  /**
+   * Update logStartOffset and low watermark if 1) offset <= highWatermark and 2) it is the leader replica.
+   * This function can trigger log segment deletion and log rolling.
+   *
+   * Return low watermark of the partition.
+   */
+  def deleteRecordsOnLeader(offset: Long): Long = {
+    inReadLock(leaderIsrUpdateLock) {
+      leaderReplicaIfLocal match {
+        case Some(leaderReplica) =>
+          leaderReplica.maybeIncrementLogStartOffset(offset)
+          if (!leaderReplica.log.get.config.delete)
+            throw new PolicyViolationException("Records of partition %s can not be deleted due to the configured policy".format(topicPartition))
+          lowWatermarkIfLeader
+        case None =>
+          throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+            .format(topicPartition, localBrokerId))
+      }
+    }
+  }
+
   private def updateIsr(newIsr: Set[Replica]) {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 8597b06..3995f9e 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -21,7 +21,8 @@ import kafka.log.Log
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import kafka.common.KafkaException
-import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+
 
 import org.apache.kafka.common.utils.Time
 
@@ -35,6 +36,9 @@ class Replica(val brokerId: Int,
   // the log end offset value, kept in all replicas;
   // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
   @volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
+  // the log start offset value, kept in all replicas;
+  // for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch
+  @volatile private[this] var _logStartOffset = Log.UnknownLogStartOffset
 
   // The log end offset value at the time the leader received the last FetchRequest from this follower
   // This is used to determine the lastCaughtUpTimeMs of the follower
@@ -72,6 +76,7 @@ class Replica(val brokerId: Int,
     else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
       _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
 
+    logStartOffset = logReadResult.followerLogStartOffset
     logEndOffset = logReadResult.info.fetchOffsetMetadata
     lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
     lastFetchTimeMs = logReadResult.fetchTimeMs
@@ -98,6 +103,33 @@ class Replica(val brokerId: Int,
     else
       logEndOffsetMetadata
 
+  def maybeIncrementLogStartOffset(offset: Long) {
+    if (isLocal) {
+      if (highWatermark.messageOffset < offset)
+        throw new OffsetOutOfRangeException(s"The specified offset $offset is higher than the high watermark" +
+                                            s" ${highWatermark.messageOffset} of the partition $topicPartition")
+      log.get.maybeIncrementLogStartOffset(offset)
+    } else {
+      throw new KafkaException(s"Should not try to delete records on partition $topicPartition's non-local replica $brokerId")
+    }
+  }
+
+  private def logStartOffset_=(newLogStartOffset: Long) {
+    if (isLocal) {
+      throw new KafkaException(s"Should not set log start offset on partition $topicPartition's local replica $brokerId " +
+                               s"without attempting to delete records of the log")
+    } else {
+      _logStartOffset = newLogStartOffset
+      trace(s"Setting log start offset for remote replica $brokerId for partition $topicPartition to [$newLogStartOffset]")
+    }
+  }
+
+  def logStartOffset =
+    if (isLocal)
+      log.get.logStartOffset
+    else
+      _logStartOffset
+
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
     if (isLocal) {
       highWatermarkMetadata = newHighWatermark

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 75b1f24..c4b7ce6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -101,8 +101,7 @@ class ConsumerFetcherThread(name: String,
   protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
     partitionMap.foreach { case ((topicPartition, partitionFetchState)) =>
       if (partitionFetchState.isActive)
-        fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.offset,
-          fetchSize)
+        fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.fetchOffset, fetchSize)
     }
 
     new FetchRequest(fetchRequestBuilder.build())

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d2cac23..96535b1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -79,6 +79,17 @@ case class LogAppendInfo(var firstOffset: Long,
  *
  * @param dir The directory in which log segments are created.
  * @param config The log configuration settings
+ * @param logStartOffset The earliest offset allowed to be exposed to kafka client.
+ *                       The logStartOffset can be updated by :
+ *                       - user's DeleteRecordsRequest
+ *                       - broker's log retention
+ *                       - broker's log truncation
+ *                       The logStartOffset is used to decide the following:
+ *                       - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
+ *                         It may trigger log rolling if the active segment is deleted.
+ *                       - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
+ *                         we make sure that logStartOffset <= log's highWatermark
+ *                       Other activities such as log cleaning are not affected by logStartOffset.
  * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
  * @param scheduler The thread pool scheduler used for background actions
  * @param time The time instance used for checking the clock
@@ -87,6 +98,7 @@ case class LogAppendInfo(var firstOffset: Long,
 @threadsafe
 class Log(@volatile var dir: File,
           @volatile var config: LogConfig,
+          @volatile var logStartOffset: Long = 0L,
           @volatile var recoveryPoint: Long = 0L,
           scheduler: Scheduler,
           time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
@@ -118,8 +130,10 @@ class Log(@volatile var dir: File,
     nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
       activeSegment.size.toInt)
 
-    info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
-      .format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
+    logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+
+    info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
+      .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
   }
 
   val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
@@ -443,6 +457,20 @@ class Log(@volatile var dir: File,
     }
   }
 
+  /*
+   * Increment the log start offset if the provided offset is larger.
+   */
+  def maybeIncrementLogStartOffset(offset: Long) {
+    // We don't have to write the log start offset to log-start-offset-checkpoint immediately.
+    // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
+    // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
+    lock synchronized {
+      if (offset > logStartOffset) {
+        logStartOffset = offset
+      }
+    }
+  }
+
   /**
    * Validate the following:
    * <ol>
@@ -543,7 +571,7 @@ class Log(@volatile var dir: File,
    * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
    * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
    *
-   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
+   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
    * @return The fetch data information including fetch starting offset metadata and messages read.
    */
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
@@ -558,9 +586,9 @@ class Log(@volatile var dir: File,
 
     var entry = segments.floorEntry(startOffset)
 
-    // attempt to read beyond the log end offset is an error
-    if(startOffset > next || entry == null)
-      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
+    // return error on attempt to read beyond the log end offset or read below log start offset
+    if(startOffset > next || entry == null || startOffset < logStartOffset)
+      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
 
     // Do the read on the segment with a base offset less than the target offset
     // but if that segment doesn't contain any messages with an offset greater than that
@@ -626,7 +654,7 @@ class Log(@volatile var dir: File,
     val segmentsCopy = logSegments.toBuffer
     // For the earliest and latest, we do not need to return the timestamp.
     if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
+        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
     else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
         return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
 
@@ -640,7 +668,7 @@ class Log(@volatile var dir: File,
         None
     }
 
-    targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
+    targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
   }
 
   /**
@@ -666,16 +694,21 @@ class Log(@volatile var dir: File,
   private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
     lock synchronized {
       val deletable = deletableSegments(predicate)
-      val numToDelete = deletable.size
-      if (numToDelete > 0) {
-        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
-        if (segments.size == numToDelete)
-          roll()
-        // remove the segments for lookups
-        deletable.foreach(deleteSegment)
-      }
-      numToDelete
+      deleteSegments(deletable)
+    }
+  }
+
+  private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
+    val numToDelete = deletable.size
+    if (numToDelete > 0) {
+      // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
+      if (segments.size == numToDelete)
+        roll()
+      // remove the segments for lookups
+      deletable.foreach(deleteSegment)
+      logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
     }
+    numToDelete
   }
 
   /**
@@ -696,10 +729,10 @@ class Log(@volatile var dir: File,
     */
   def deleteOldSegments(): Int = {
     if (!config.delete) return 0
-    deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
+    deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
   }
 
-  private def deleteRetenionMsBreachedSegments() : Int = {
+  private def deleteRetentionMsBreachedSegments() : Int = {
     if (config.retentionMs < 0) return 0
     val startMs = time.milliseconds
     deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
@@ -719,16 +752,27 @@ class Log(@volatile var dir: File,
     deleteOldSegments(shouldDelete)
   }
 
+  private def deleteLogStartOffsetBreachedSegments() : Int = {
+    // keep active segment to avoid frequent log rolling due to user's DeleteRecordsRequest
+    lock synchronized {
+      val deletable = {
+        if (segments.size() < 2)
+          Seq.empty
+        else
+          logSegments.sliding(2).takeWhile { iterable =>
+            val nextSegment = iterable.toSeq(1)
+            nextSegment.baseOffset <= logStartOffset
+          }.map(_.toSeq(0)).toSeq
+      }
+      deleteSegments(deletable)
+    }
+  }
+
   /**
    * The size of the log in bytes
    */
   def size: Long = logSegments.map(_.size).sum
 
-   /**
-   * The earliest message offset in the log
-   */
-  def logStartOffset: Long = logSegments.head.baseOffset
-
   /**
    * The offset metadata of the next message that will be appended to the log
    */
@@ -789,7 +833,7 @@ class Log(@volatile var dir: File,
   def roll(expectedNextOffset: Long = 0): LogSegment = {
     val start = time.nanoseconds
     lock synchronized {
-      val newOffset = Math.max(expectedNextOffset, logEndOffset)
+      val newOffset = math.max(expectedNextOffset, logEndOffset)
       val logFile = logFilename(dir, newOffset)
       val indexFile = indexFilename(dir, newOffset)
       val timeIndexFile = timeIndexFilename(dir, newOffset)
@@ -895,6 +939,7 @@ class Log(@volatile var dir: File,
         activeSegment.truncateTo(targetOffset)
         updateLogEndOffset(targetOffset)
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
+        this.logStartOffset = math.min(targetOffset, this.logStartOffset)
       }
     }
   }
@@ -920,6 +965,7 @@ class Log(@volatile var dir: File,
                                 preallocate = config.preallocate))
       updateLogEndOffset(newOffset)
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
+      this.logStartOffset = newOffset
     }
   }
 
@@ -1082,6 +1128,8 @@ object Log {
   /** a directory that is scheduled to be deleted */
   val DeleteDirSuffix = "-delete"
 
+  val UnknownLogStartOffset = -1L
+
   /**
    * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
    * so that ls sorts the files numerically.

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 761edf9..a555420 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -48,12 +48,14 @@ class LogManager(val logDirs: Array[File],
                  val cleanerConfig: CleanerConfig,
                  ioThreads: Int,
                  val flushCheckMs: Long,
-                 val flushCheckpointMs: Long,
+                 val flushRecoveryOffsetCheckpointMs: Long,
+                 val flushStartOffsetCheckpointMs: Long,
                  val retentionCheckMs: Long,
                  scheduler: Scheduler,
                  val brokerState: BrokerState,
                  time: Time) extends Logging {
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
+  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
   val LockFile = ".lock"
   val InitialTaskDelayMs = 30*1000
 
@@ -64,6 +66,7 @@ class LogManager(val logDirs: Array[File],
   createAndValidateLogDirs(logDirs)
   private val dirLocks = lockLogDirs(logDirs)
   private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
+  private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, LogStartOffsetCheckpointFile)))).toMap
   loadLogs()
 
   // public, so we can access this from kafka.admin.DeleteTopicTest
@@ -139,10 +142,18 @@ class LogManager(val logDirs: Array[File],
         recoveryPoints = this.recoveryPointCheckpoints(dir).read
       } catch {
         case e: Exception =>
-          warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
+          warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e)
           warn("Resetting the recovery checkpoint to 0")
       }
 
+      var logStartOffsets = Map[TopicPartition, Long]()
+      try {
+        logStartOffsets = this.logStartOffsetCheckpoints(dir).read
+      } catch {
+        case e: Exception =>
+          warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e)
+      }
+
       val jobsForDir = for {
         dirContent <- Option(dir.listFiles).toList
         logDir <- dirContent if logDir.isDirectory
@@ -153,8 +164,9 @@ class LogManager(val logDirs: Array[File],
           val topicPartition = Log.parseTopicPartitionName(logDir)
           val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
           val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
+          val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
 
-          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
+          val current = new Log(logDir, config, logStartOffset, logRecoveryPoint, scheduler, time)
           if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
             this.logsToBeDeleted.add(current)
           } else {
@@ -210,7 +222,12 @@ class LogManager(val logDirs: Array[File],
       scheduler.schedule("kafka-recovery-point-checkpoint",
                          checkpointRecoveryPointOffsets,
                          delay = InitialTaskDelayMs,
-                         period = flushCheckpointMs,
+                         period = flushRecoveryOffsetCheckpointMs,
+                         TimeUnit.MILLISECONDS)
+      scheduler.schedule("kafka-log-start-offset-checkpoint",
+                         checkpointLogStartOffsets,
+                         delay = InitialTaskDelayMs,
+                         period = flushStartOffsetCheckpointMs,
                          TimeUnit.MILLISECONDS)
       scheduler.schedule("kafka-delete-logs",
                          deleteLogs,
@@ -263,7 +280,10 @@ class LogManager(val logDirs: Array[File],
 
         // update the last flush point
         debug("Updating recovery points at " + dir)
-        checkpointLogsInDir(dir)
+        checkpointLogRecoveryOffsetsInDir(dir)
+
+        debug("Updating log start offsets at " + dir)
+        checkpointLogStartOffsetsInDir(dir)
 
         // mark that the shutdown was clean by creating marker file
         debug("Writing clean shutdown marker at " + dir)
@@ -333,13 +353,21 @@ class LogManager(val logDirs: Array[File],
    * to avoid recovering the whole log on startup.
    */
   def checkpointRecoveryPointOffsets() {
-    this.logDirs.foreach(checkpointLogsInDir)
+    this.logDirs.foreach(checkpointLogRecoveryOffsetsInDir)
+  }
+
+  /**
+   * Write out the current log start offset for all logs to a text file in the log directory
+   * to avoid exposing data that have been deleted by DeleteRecordsRequest
+   */
+  def checkpointLogStartOffsets() {
+    this.logDirs.foreach(checkpointLogStartOffsetsInDir)
   }
 
   /**
    * Make a checkpoint for all logs in provided directory.
    */
-  private def checkpointLogsInDir(dir: File): Unit = {
+  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
     val recoveryPoints = this.logsByDir.get(dir.toString)
     if (recoveryPoints.isDefined) {
       this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
@@ -347,6 +375,17 @@ class LogManager(val logDirs: Array[File],
   }
 
   /**
+   * Checkpoint log start offset for all logs in provided directory.
+   */
+  private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
+    val logs = this.logsByDir.get(dir.toString)
+    if (logs.isDefined) {
+      this.logStartOffsetCheckpoints(dir).write(
+        logs.get.filter{case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset}.mapValues(_.logStartOffset))
+    }
+  }
+
+  /**
    * Get the log if it exists, otherwise return None
    */
   def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
@@ -362,7 +401,7 @@ class LogManager(val logDirs: Array[File],
         val dataDir = nextLogDir()
         val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
         dir.mkdirs()
-        val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
+        val log = new Log(dir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler, time)
         logs.put(topicPartition, log)
         info("Created log for partition [%s,%d] in %s with properties {%s}."
           .format(topicPartition.topic,
@@ -425,6 +464,7 @@ class LogManager(val logDirs: Array[File],
       val renamedDir = new File(removedLog.dir.getParent, dirName)
       val renameSuccessful = removedLog.dir.renameTo(renamedDir)
       if (renameSuccessful) {
+        checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
         removedLog.dir = renamedDir
         // change the file pointers for log and index file
         for (logSegment <- removedLog.logSegments) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 9263515..4e77625 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -387,32 +387,33 @@ class LogSegment(val log: FileRecords,
   }
 
   /**
-   * Search the message offset based on timestamp.
-   * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
-   * greater than or equals to the target timestamp.
+   * Search the message offset based on timestamp and offset.
    *
-   * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
-   * timestamp will be max timestamp in the segment.
+   * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
    *
-   * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
-   * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
+   * - If all the messages in the segment have smaller offsets, return None
+   * - If all the messages in the segment have smaller timestamps, return None
+   * - If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp
+   *   the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp.
+   * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp
+   *   is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset.
    *
-   * This methods only returns None when the log is not empty but we did not see any messages when scanning the log
-   * from the indexed position. This could happen if the log is truncated after we get the indexed position but
-   * before we scan the log from there. In this case we simply return None and the caller will need to check on
-   * the truncated log and maybe retry or even do the search on another log segment.
+   * This methods only returns None when 1) all messages' offset < startOffing or 2) the log is not empty but we did not
+   * see any message when scanning the log from the indexed position. The latter could happen if the log is truncated
+   * after we get the indexed position but before we scan the log from there. In this case we simply return None and the
+   * caller will need to check on the truncated log and maybe retry or even do the search on another log segment.
    *
    * @param timestamp The timestamp to search for.
-   * @return the timestamp and offset of the first message whose timestamp is larger than or equal to the
-   *         target timestamp. None will be returned if there is no such message.
+   * @param startingOffset The starting offset to search.
+   * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message.
    */
-  def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
+  def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
     // Get the index entry with a timestamp less than or equal to the target timestamp
     val timestampOffset = timeIndex.lookup(timestamp)
-    val position = index.lookup(timestampOffset.offset).position
+    val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position
 
     // Search the timestamp
-    Option(log.searchForTimestamp(timestamp, position)).map { timestampAndOffset =>
+    Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
       TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8842724..14e56bd 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -63,13 +63,13 @@ abstract class AbstractFetcherThread(name: String,
   /* callbacks to be defined in subclass */
 
   // process fetched data
-  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD)
+  protected def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD)
 
   // handle a partition whose offset is out of range and return a new fetch offset
-  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
+  protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
 
   // deal with partitions with errors, potentially due to leadership changes
-  def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
+  protected def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
 
   protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): REQ
 
@@ -140,17 +140,17 @@ abstract class AbstractFetcherThread(name: String,
           val partitionId = topicPartition.partition
           Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
             // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
-            if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) {
+            if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset) {
               partitionData.error match {
                 case Errors.NONE =>
                   try {
                     val records = partitionData.toRecords
                     val newOffset = records.batches.asScala.lastOption.map(_.nextOffset).getOrElse(
-                      currentPartitionFetchState.offset)
+                      currentPartitionFetchState.fetchOffset)
 
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
+                    processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset, partitionData)
 
                     val validBytes = records.validBytes
                     if (validBytes > 0) {
@@ -164,18 +164,18 @@ abstract class AbstractFetcherThread(name: String,
                       // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       // should get fixed in the subsequent fetches
-                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error " + ime.getMessage)
+                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.fetchOffset  + " error " + ime.getMessage)
                       updatePartitionsWithError(topicPartition);
                     case e: Throwable =>
                       throw new KafkaException("error processing data for partition [%s,%d] offset %d"
-                        .format(topic, partitionId, currentPartitionFetchState.offset), e)
+                        .format(topic, partitionId, currentPartitionFetchState.fetchOffset), e)
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   try {
                     val newOffset = handleOffsetOutOfRange(topicPartition)
                     partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
                     error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
-                      .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
+                      .format(currentPartitionFetchState.fetchOffset, topic, partitionId, newOffset))
                   } catch {
                     case e: FatalExitError => throw e
                     case e: Throwable =>
@@ -226,7 +226,7 @@ abstract class AbstractFetcherThread(name: String,
       for (partition <- partitions) {
         Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState =>
           if (currentPartitionFetchState.isActive)
-            partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
+            partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay)))
         )
       }
       partitionMapCond.signalAll()
@@ -350,11 +350,11 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId:
 /**
   * case class to keep partition offset and its state(active, inactive)
   */
-case class PartitionFetchState(offset: Long, delay: DelayedItem) {
+case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem) {
 
-  def this(offset: Long) = this(offset, new DelayedItem(0))
+  def this(fetchOffset: Long) = this(fetchOffset, new DelayedItem(0))
 
   def isActive: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0
 
-  override def toString = "%d-%b".format(offset, isActive)
+  override def toString = "%d-%b".format(fetchOffset, isActive)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
new file mode 100644
index 0000000..e5b301c
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+
+import java.util.concurrent.TimeUnit
+
+import com.yammer.metrics.core.Meter
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Pool
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.DeleteRecordsResponse
+
+import scala.collection._
+
+
+case class DeleteRecordsPartitionStatus(requiredOffset: Long,
+                                        responseStatus: DeleteRecordsResponse.PartitionResponse) {
+  @volatile var acksPending = false
+
+  override def toString = "[acksPending: %b, error: %s, lowWatermark: %d, requiredOffset: %d]"
+    .format(acksPending, responseStatus.error.toString, responseStatus.lowWatermark, requiredOffset)
+}
+
+/**
+ * A delayed delete records operation that can be created by the replica manager and watched
+ * in the delete records operation purgatory
+ */
+class DelayedDeleteRecords(delayMs: Long,
+                           deleteRecordsStatus:  Map[TopicPartition, DeleteRecordsPartitionStatus],
+                           replicaManager: ReplicaManager,
+                           responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit)
+  extends DelayedOperation(delayMs) {
+
+  // first update the acks pending variable according to the error code
+  deleteRecordsStatus.foreach { case (topicPartition, status) =>
+    if (status.responseStatus.error == Errors.NONE) {
+      // Timeout error state will be cleared when required acks are received
+      status.acksPending = true
+      status.responseStatus.error = Errors.REQUEST_TIMED_OUT
+    } else {
+      status.acksPending = false
+    }
+
+    trace("Initial partition status for %s is %s".format(topicPartition, status))
+  }
+
+  /**
+   * The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
+   *
+   * 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response
+   * 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
+   *
+   */
+  override def tryComplete(): Boolean = {
+    // check for each partition if it still has pending acks
+    deleteRecordsStatus.foreach { case (topicPartition, status) =>
+      trace(s"Checking delete records satisfaction for ${topicPartition}, current status $status")
+      // skip those partitions that have already been satisfied
+      if (status.acksPending) {
+        val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
+          case Some(partition) =>
+            partition.leaderReplicaIfLocal match {
+              case Some(_) =>
+                val leaderLW = partition.lowWatermarkIfLeader
+                (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
+              case None =>
+                (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+            }
+          case None =>
+            (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+        }
+        if (error != Errors.NONE || lowWatermarkReached) {
+          status.acksPending = false
+          status.responseStatus.error = error
+          status.responseStatus.lowWatermark = lw
+        }
+      }
+    }
+
+    // check if every partition has satisfied at least one of case A or B
+    if (!deleteRecordsStatus.values.exists(_.acksPending))
+      forceComplete()
+    else
+      false
+  }
+
+  override def onExpiration() {
+    deleteRecordsStatus.foreach { case (topicPartition, status) =>
+      if (status.acksPending) {
+        DelayedDeleteRecordsMetrics.recordExpiration(topicPartition)
+      }
+    }
+  }
+
+  /**
+   * Upon completion, return the current response status along with the error code per partition
+   */
+  override def onComplete() {
+    val responseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+    responseCallback(responseStatus)
+  }
+}
+
+object DelayedDeleteRecordsMetrics extends KafkaMetricsGroup {
+
+  private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
+
+  def recordExpiration(partition: TopicPartition) {
+    aggregateExpirationMeter.mark()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index a05131a..cbee78a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -150,7 +150,7 @@ class DelayedFetch(delayMs: Long,
     )
 
     val fetchPartitionData = logReadResults.map { case (tp, result) =>
-      tp -> FetchPartitionData(result.error, result.hw, result.info.records)
+      tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
     }
 
     responseCallback(fetchPartitionData)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0798efd..defbf34 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -99,6 +99,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
         case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
+        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -147,7 +148,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       val leaderAndIsrResponse =
         if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
+          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
           new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
         } else {
           val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
@@ -199,7 +200,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val updateMetadataResponse =
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-        val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
+        val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
         if (deletedPartitions.nonEmpty)
           coordinator.handleDeletedPartitions(deletedPartitions)
 
@@ -451,12 +452,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
       case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
 
     val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
       case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
 
     // the callback for sending a fetch response
@@ -474,17 +475,17 @@ class KafkaApis(val requestChannel: RequestChannel,
           val convertedData = replicaManager.getMagic(tp) match {
             case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) =>
               trace(s"Down converting message to V0 for fetch request from $clientId")
-              FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
+              FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
 
             case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) =>
               trace(s"Down converting message to V1 for fetch request from $clientId")
-              FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
+              FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
 
             case _ => data
           }
 
-          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LSO,
-            null, convertedData.records)
+          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            convertedData.logStartOffset, null, convertedData.records)
         }
       }
 
@@ -728,7 +729,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new Array[(Long, Long)](segments.length)
 
     for (i <- segments.indices)
-      offsetTimeArray(i) = (segments(i).baseOffset, segments(i).lastModified)
+      offsetTimeArray(i) = (math.max(segments(i).baseOffset, log.logStartOffset), segments(i).lastModified)
     if (lastSegmentHasSize)
       offsetTimeArray(segments.length) = (log.logEndOffset, time.milliseconds)
 
@@ -1259,6 +1260,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleDeleteRecordsRequest(request: RequestChannel.Request) {
+    val deleteRecordsRequest = request.body[DeleteRecordsRequest]
+
+    val (authorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteRecordsRequest.partitionOffsets.asScala.partition {
+      case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
+    }
+
+    val (authorizedForDeleteTopics, unauthorizedForDeleteTopics) = authorizedForDescribeTopics.partition {
+      case (topicPartition, _) => authorize(request.session, Delete, new Resource(auth.Topic, topicPartition.topic))
+    }
+
+    // the callback for sending a DeleteRecordsResponse
+    def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
+
+      val mergedResponseStatus = responseStatus ++
+        unauthorizedForDeleteTopics.mapValues(_ =>
+          new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
+        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
+          new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
+
+      mergedResponseStatus.foreach { case (topicPartition, status) =>
+        if (status.error != Errors.NONE) {
+          debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format(
+            request.header.correlationId,
+            request.header.clientId,
+            topicPartition,
+            status.error.exceptionName))
+        }
+      }
+
+      val respBody = new DeleteRecordsResponse(mergedResponseStatus.asJava)
+      requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
+
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeMs = time.milliseconds
+    }
+
+    if (authorizedForDeleteTopics.isEmpty)
+      sendResponseCallback(Map.empty)
+    else {
+      // call the replica manager to append messages to the replicas
+      replicaManager.deleteRecords(
+        deleteRecordsRequest.timeout.toLong,
+        authorizedForDeleteTopics.mapValues(_.toLong),
+        sendResponseCallback)
+    }
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 879bc51..fe6631e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -100,6 +100,7 @@ object Defaults {
   val LogDeleteDelayMs = 60000
   val LogFlushSchedulerIntervalMs = Long.MaxValue
   val LogFlushOffsetCheckpointIntervalMs = 60000
+  val LogFlushStartOffsetCheckpointIntervalMs = 60000
   val LogPreAllocateEnable = false
   // lazy val as `InterBrokerProtocolVersion` is defined later
   lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
@@ -125,6 +126,7 @@ object Defaults {
   val ReplicaHighWatermarkCheckpointIntervalMs = 5000L
   val FetchPurgatoryPurgeIntervalRequests = 1000
   val ProducerPurgatoryPurgeIntervalRequests = 1000
+  val DeleteRecordsPurgatoryPurgeIntervalRequests = 1
   val AutoLeaderRebalanceEnable = true
   val LeaderImbalancePerBrokerPercentage = 10
   val LeaderImbalanceCheckIntervalSeconds = 300
@@ -273,6 +275,7 @@ object KafkaConfig {
   val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
   val LogFlushIntervalMsProp = "log.flush.interval.ms"
   val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
+  val LogFlushStartOffsetCheckpointIntervalMsProp = "log.flush.start.offset.checkpoint.interval.ms"
   val LogPreAllocateProp = "log.preallocate"
   val LogMessageFormatVersionProp = LogConfigPrefix + "message.format.version"
   val LogMessageTimestampTypeProp = LogConfigPrefix + "message.timestamp.type"
@@ -296,6 +299,7 @@ object KafkaConfig {
   val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms"
   val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests"
   val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests"
+  val DeleteRecordsPurgatoryPurgeIntervalRequestsProp = "delete.records.purgatory.purge.interval.requests"
   val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable"
   val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage"
   val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds"
@@ -468,6 +472,7 @@ object KafkaConfig {
   val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk"
   val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in " + LogFlushSchedulerIntervalMsProp + " is used"
   val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point"
+  val LogFlushStartOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of log start offset"
   val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."
   val LogMessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid ApiVersion. " +
     "Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format version, the " +
@@ -521,6 +526,7 @@ object KafkaConfig {
   val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark is saved out to disk"
   val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory"
   val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the producer request purgatory"
+  val DeleteRecordsPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the delete records request purgatory"
   val AutoLeaderRebalanceEnableDoc = "Enables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervals"
   val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage."
   val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller"
@@ -686,6 +692,7 @@ object KafkaConfig {
       .define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc)
       .define(LogFlushIntervalMsProp, LONG, null, HIGH, LogFlushIntervalMsDoc)
       .define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc)
+      .define(LogFlushStartOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushStartOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushStartOffsetCheckpointIntervalMsDoc)
       .define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc)
       .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
       .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
@@ -710,6 +717,7 @@ object KafkaConfig {
       .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc)
       .define(FetchPurgatoryPurgeIntervalRequestsProp, INT, Defaults.FetchPurgatoryPurgeIntervalRequests, MEDIUM, FetchPurgatoryPurgeIntervalRequestsDoc)
       .define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc)
+      .define(DeleteRecordsPurgatoryPurgeIntervalRequestsProp, INT, Defaults.DeleteRecordsPurgatoryPurgeIntervalRequests, MEDIUM, DeleteRecordsPurgatoryPurgeIntervalRequestsDoc)
       .define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc)
       .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc)
       .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
@@ -862,6 +870,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
   val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
   val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
+  val logFlushStartOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp).toLong
   val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
   val logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
   val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
@@ -907,6 +916,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
   val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
   val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
+  val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
   val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
   val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 465b0b7..0d3e49c 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -297,7 +297,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   }
 
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
-    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower)
+    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, metadataCache)
 
   private def initZk(): ZkUtils = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
@@ -655,7 +655,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                    cleanerConfig = cleanerConfig,
                    ioThreads = config.numRecoveryThreadsPerDataDir,
                    flushCheckMs = config.logFlushSchedulerIntervalMs,
-                   flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+                   flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+                   flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
                    retentionCheckMs = config.logCleanupIntervalMs,
                    scheduler = kafkaScheduler,
                    brokerState = brokerState,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9a6090d..bf36974 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -132,6 +132,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def isBrokerAlive(brokerId: Int): Boolean = {
+    inReadLock(partitionMetadataLock) {
+      aliveBrokers.contains(brokerId)
+    }
+  }
+
   def getAliveBrokers: Seq[Broker] = {
     inReadLock(partitionMetadataLock) {
       aliveBrokers.values.toBuffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 29a2467..5f055a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -60,7 +60,8 @@ class ReplicaFetcherThread(name: String,
   type PD = PartitionData
 
   private val fetchRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
@@ -136,10 +137,12 @@ class ReplicaFetcherThread(name: String,
         trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
           .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
       val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
+      val leaderLogStartOffset = partitionData.logStartOffset
       // for the follower replica, we do not need to keep
       // its segment base offset the physical position,
       // these values will be computed upon making the leader
       replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
+      replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
       if (logger.isTraceEnabled)
         trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
       if (quota.isThrottled(topicPartition))
@@ -289,8 +292,10 @@ class ReplicaFetcherThread(name: String,
 
     partitionMap.foreach { case (topicPartition, partitionFetchState) =>
       // We will not include a replica in the fetch request if it should be throttled.
-      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
-        requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
+      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition)) {
+        val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset
+        requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
+      }
     }
 
     val requestBuilder = JFetchRequest.Builder.forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, requestMap)
@@ -313,7 +318,7 @@ object ReplicaFetcherThread {
   private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest {
     def isEmpty: Boolean = underlying.fetchData().isEmpty
     def offset(topicPartition: TopicPartition): Long =
-      underlying.fetchData().asScala(topicPartition).offset
+      underlying.fetchData().asScala(topicPartition).fetchOffset
   }
 
   private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
@@ -326,6 +331,8 @@ object ReplicaFetcherThread {
 
     def highWatermark: Long = underlying.highWatermark
 
+    def logStartOffset: Long = underlying.logStartOffset
+
     def exception: Option[Throwable] = error match {
       case Errors.NONE => None
       case e => Some(e.exception)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5ba093e..8f67425 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,12 +29,13 @@ import kafka.log.{Log, LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils._
-import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, PolicyViolationException}
+import org.apache.kafka.common.errors.{NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, DeleteRecordsRequest, DeleteRecordsResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -52,6 +53,13 @@ case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = N
   }
 }
 
+case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exception: Option[Throwable] = None) {
+  def error: Errors = exception match {
+    case None => Errors.NONE
+    case Some(e) => Errors.forException(e)
+  }
+}
+
 /*
  * Result metadata of a log read operation on the log
  * @param info @FetchDataInfo returned by the @Log read
@@ -63,7 +71,9 @@ case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = N
  */
 case class LogReadResult(info: FetchDataInfo,
                          hw: Long,
+                         leaderLogStartOffset: Long,
                          leaderLogEndOffset: Long,
+                         followerLogStartOffset: Long,
                          fetchTimeMs: Long,
                          readSize: Int,
                          exception: Option[Throwable] = None) {
@@ -74,16 +84,19 @@ case class LogReadResult(info: FetchDataInfo,
   }
 
   override def toString =
-    s"Fetch Data: [$info], HW: [$hw], leaderLogEndOffset: [$leaderLogEndOffset], readSize: [$readSize], error: [$error]"
+    s"Fetch Data: [$info], HW: [$hw], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
+    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]"
 
 }
 
-case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, records: Records)
+case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records)
 
 object LogReadResult {
   val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                                            hw = -1L,
+                                           leaderLogStartOffset = -1L,
                                            leaderLogEndOffset = -1L,
+                                           followerLogStartOffset = -1L,
                                            fetchTimeMs = -1L,
                                            readSize = -1)
 }
@@ -109,6 +122,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean,
                      quotaManager: ReplicationQuotaManager,
+                     val metadataCache: MetadataCache,
                      threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
@@ -130,6 +144,8 @@ class ReplicaManager(val config: KafkaConfig,
     purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests)
   val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
     purgatoryName = "Fetch", localBrokerId, config.fetchPurgatoryPurgeIntervalRequests)
+  val delayedDeleteRecordsPurgatory = DelayedOperationPurgatory[DelayedDeleteRecords](
+    purgatoryName = "DeleteRecords", localBrokerId, config.deleteRecordsPurgatoryPurgeIntervalRequests)
 
   val leaderCount = newGauge(
     "LeaderCount",
@@ -212,6 +228,15 @@ class ReplicaManager(val config: KafkaConfig,
     debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
   }
 
+  /**
+   * Try to complete some delayed DeleteRecordsRequest with the request key;
+   * this needs to be triggered when the partition low watermark has changed
+   */
+  def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) {
+    val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key)
+    debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed))
+  }
+
   def startup() {
     // start ISR expiration thread
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
@@ -316,7 +341,7 @@ class ReplicaManager(val config: KafkaConfig,
                   new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
       }
 
-      if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
+      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
         val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
@@ -345,14 +370,108 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
+   * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
+   */
+  private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
+    trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
+    offsetPerPartition.map { case (topicPartition, requestedOffset) =>
+      // reject delete records operation on internal topics
+      if (Topic.isInternal(topicPartition.topic)) {
+        (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
+      } else {
+        try {
+          val partition = getPartition(topicPartition).getOrElse(
+            throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId)))
+          val convertedOffset =
+            if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) {
+              partition.leaderReplicaIfLocal match {
+                case Some(leaderReplica) =>
+                  leaderReplica.highWatermark.messageOffset
+                case None =>
+                  throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+                    .format(topicPartition, localBrokerId))
+              }
+            } else
+              requestedOffset
+          if (convertedOffset < 0)
+            throw new OffsetOutOfRangeException(s"The offset $convertedOffset for partition $topicPartition is not valid")
+
+          val lowWatermark = partition.deleteRecordsOnLeader(convertedOffset)
+          (topicPartition, LogDeleteRecordsResult(convertedOffset, lowWatermark))
+        } catch {
+          // NOTE: Failed produce requests metric is not incremented for known exceptions
+          // it is supposed to indicate un-expected failures of a broker in handling a produce request
+          case e: KafkaStorageException =>
+            fatal("Halting due to unrecoverable I/O error while handling DeleteRecordsRequest: ", e)
+            Runtime.getRuntime.halt(1)
+            (topicPartition, null)
+          case e@ (_: UnknownTopicOrPartitionException |
+                   _: NotLeaderForPartitionException |
+                   _: OffsetOutOfRangeException |
+                   _: PolicyViolationException |
+                   _: NotEnoughReplicasException) =>
+            (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e)))
+          case t: Throwable =>
+            error("Error processing delete records operation on partition %s".format(topicPartition), t)
+            (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(t)))
+        }
+      }
+    }
+  }
+
+  // If there exists a topic partition that meets the following requirement,
+  // we need to put a delayed DeleteRecordsRequest and wait for the delete records operation to complete
+  //
+  // 1. the delete records operation on this partition is successful
+  // 2. low watermark of this partition is smaller than the specified offset
+  private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = {
+    localDeleteRecordsResults.exists{ case (tp, deleteRecordsResult) =>
+      deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset
+    }
+  }
+
+  def deleteRecords(timeout: Long,
+                    offsetPerPartition: Map[TopicPartition, Long],
+                    responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit) {
+    val timeBeforeLocalDeleteRecords = time.milliseconds
+    val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
+    debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))
+
+    val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
+      topicPartition ->
+        DeleteRecordsPartitionStatus(
+          result.requestedOffset, // requested offset
+          new DeleteRecordsResponse.PartitionResponse(result.lowWatermark, result.error)) // response status
+    }
+
+    if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
+      // create delayed delete records operation
+      val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
+
+      // create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
+      val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+
+      // try to complete the request immediately, otherwise put it into the purgatory
+      // this is because while the delayed delete records operation is being created, new
+      // requests may arrive and hence make this operation completable.
+      delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
+    } else {
+      // we can respond immediately
+      val deleteRecordsResponseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+      responseCallback(deleteRecordsResponseStatus)
+    }
+  }
+
   // If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
   //
   // 1. required acks = -1
   // 2. there is data to append
   // 3. at least one partition append was successful (fewer errors than partitions)
-  private def delayedRequestRequired(requiredAcks: Short,
-                                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
-                                     localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
+  private def delayedProduceRequestRequired(requiredAcks: Short,
+                                            entriesPerPartition: Map[TopicPartition, MemoryRecords],
+                                            localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
     requiredAcks == -1 &&
     entriesPerPartition.nonEmpty &&
     localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
@@ -471,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig,
     //                        4) some error happens while reading data
     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
-        tp -> FetchPartitionData(result.error, result.hw, result.info.records)
+        tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
       }
       responseCallback(fetchPartitionData)
     } else {
@@ -508,8 +627,9 @@ class ReplicaManager(val config: KafkaConfig,
                        quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
 
     def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
-      val offset = fetchInfo.offset
+      val offset = fetchInfo.fetchOffset
       val partitionFetchSize = fetchInfo.maxBytes
+      val followerLogStartOffset = fetchInfo.logStartOffset
 
       BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
       BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
@@ -539,6 +659,7 @@ class ReplicaManager(val config: KafkaConfig,
          */
         val initialLogEndOffset = localReplica.logEndOffset.messageOffset
         val initialHighWatermark = localReplica.highWatermark.messageOffset
+        val initialLogStartOffset = localReplica.logStartOffset
         val fetchTimeMs = time.milliseconds
         val logReadInfo = localReplica.log match {
           case Some(log) =>
@@ -563,7 +684,9 @@ class ReplicaManager(val config: KafkaConfig,
 
         LogReadResult(info = logReadInfo,
                       hw = initialHighWatermark,
+                      leaderLogStartOffset = initialLogStartOffset,
                       leaderLogEndOffset = initialLogEndOffset,
+                      followerLogStartOffset = followerLogStartOffset,
                       fetchTimeMs = fetchTimeMs,
                       readSize = partitionFetchSize,
                       exception = None)
@@ -576,7 +699,9 @@ class ReplicaManager(val config: KafkaConfig,
                  _: OffsetOutOfRangeException) =>
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                         hw = -1L,
+                        leaderLogStartOffset = -1L,
                         leaderLogEndOffset = -1L,
+                        followerLogStartOffset = -1L,
                         fetchTimeMs = -1L,
                         readSize = partitionFetchSize,
                         exception = Some(e))
@@ -586,7 +711,9 @@ class ReplicaManager(val config: KafkaConfig,
           error(s"Error processing fetch operation on partition $tp, offset $offset", e)
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                         hw = -1L,
+                        leaderLogStartOffset = -1L,
                         leaderLogEndOffset = -1L,
+                        followerLogStartOffset = -1L,
                         fetchTimeMs = -1L,
                         readSize = partitionFetchSize,
                         exception = Some(e))
@@ -622,7 +749,7 @@ class ReplicaManager(val config: KafkaConfig,
   def getMagic(topicPartition: TopicPartition): Option[Byte] =
     getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
 
-  def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] =  {
+  def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
       if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
         val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
@@ -640,7 +767,6 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
-                             metadataCache: MetadataCache,
                              onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
@@ -695,7 +821,7 @@ class ReplicaManager(val config: KafkaConfig,
         else
           Set.empty[Partition]
         val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
+          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
         else
           Set.empty[Partition]
 
@@ -801,8 +927,7 @@ class ReplicaManager(val config: KafkaConfig,
                             epoch: Int,
                             partitionState: Map[Partition, PartitionState],
                             correlationId: Int,
-                            responseMap: mutable.Map[TopicPartition, Errors],
-                            metadataCache: MetadataCache) : Set[Partition] = {
+                            responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
     partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
         "starting the become-follower transition for partition %s")