You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/08/06 06:28:12 UTC

[2/2] git commit: kafka-1430; Purgatory redesign; patched by Guozhang Wang; reviewed by Jun Rao

kafka-1430; Purgatory redesign; patched by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0dc243b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0dc243b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0dc243b9

Branch: refs/heads/trunk
Commit: 0dc243b92a6ae1683795caf8222edc1b2bb49565
Parents: 7a67a72
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Tue Aug 5 21:27:57 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Aug 5 21:27:57 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/FetchRequest.scala |   6 +-
 .../main/scala/kafka/api/FetchResponse.scala    |   3 +-
 .../main/scala/kafka/cluster/Partition.scala    | 161 ++++---
 core/src/main/scala/kafka/cluster/Replica.scala |  80 ++--
 .../scala/kafka/consumer/SimpleConsumer.scala   |   2 +-
 core/src/main/scala/kafka/log/Log.scala         |  74 ++-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  13 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  38 +-
 .../kafka/server/AbstractFetcherThread.scala    |  17 +-
 .../main/scala/kafka/server/DelayedFetch.scala  |  91 ++++
 .../scala/kafka/server/DelayedProduce.scala     | 115 +++++
 .../scala/kafka/server/DelayedRequestKey.scala  |  38 ++
 .../main/scala/kafka/server/FetchDataInfo.scala |  22 +
 .../kafka/server/FetchRequestPurgatory.scala    |  69 +++
 .../src/main/scala/kafka/server/KafkaApis.scala | 446 +++----------------
 .../scala/kafka/server/LogOffsetMetadata.scala  |  87 ++++
 .../main/scala/kafka/server/OffsetManager.scala |  29 +-
 .../kafka/server/ProducerRequestPurgatory.scala |  69 +++
 .../kafka/server/ReplicaFetcherThread.scala     |  25 +-
 .../scala/kafka/server/ReplicaManager.scala     | 183 ++++++--
 .../scala/kafka/server/RequestPurgatory.scala   |  98 ++--
 .../scala/kafka/tools/TestEndToEndLatency.scala |  23 +-
 .../test/scala/other/kafka/StressTestLog.scala  |   2 +-
 .../kafka/integration/PrimitiveApiTest.scala    |  10 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   4 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  16 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  26 +-
 .../kafka/message/BaseMessageSetTestCases.scala |   2 +-
 .../server/HighwatermarkPersistenceTest.scala   |  38 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   8 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |  10 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |   4 +-
 .../kafka/server/RequestPurgatoryTest.scala     |  26 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |  58 ++-
 34 files changed, 1149 insertions(+), 744 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 55a5982..51cdccf 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -17,16 +17,16 @@
 
 package kafka.api
 
-import java.nio.ByteBuffer
 import kafka.utils.nonthreadsafe
 import kafka.api.ApiUtils._
-import scala.collection.immutable.Map
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.consumer.ConsumerConfig
-import java.util.concurrent.atomic.AtomicInteger
 import kafka.network.RequestChannel
 import kafka.message.MessageSet
 
+import java.util.concurrent.atomic.AtomicInteger
+import java.nio.ByteBuffer
+import scala.collection.immutable.Map
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index d117f10..af93087 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -19,6 +19,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import java.nio.channels.GatheringByteChannel
+
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.network.{MultiSend, Send}
@@ -151,7 +152,7 @@ object FetchResponse {
 
 
 case class FetchResponse(correlationId: Int,
-                         data: Map[TopicAndPartition, FetchResponsePartitionData])  {
+                         data: Map[TopicAndPartition, FetchResponsePartitionData]) {
 
   /**
    * Partitions the data into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 134aef9..ff106b4 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -21,7 +21,7 @@ import kafka.admin.AdminUtils
 import kafka.utils._
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
-import kafka.server.{OffsetManager, ReplicaManager}
+import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.message.ByteBufferMessageSet
@@ -29,7 +29,8 @@ import kafka.message.ByteBufferMessageSet
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.utils.Utils.{inReadLock,inWriteLock}
-import scala.collection._
+import scala.Some
+import scala.collection.immutable.Set
 
 import com.yammer.metrics.core.Gauge
 
@@ -39,18 +40,18 @@ import com.yammer.metrics.core.Gauge
  */
 class Partition(val topic: String,
                 val partitionId: Int,
-                var replicationFactor: Int,
                 time: Time,
-                val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
+                replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
   private val localBrokerId = replicaManager.config.brokerId
   private val logManager = replicaManager.logManager
   private val zkClient = replicaManager.zkClient
-  var leaderReplicaIdOpt: Option[Int] = None
-  var inSyncReplicas: Set[Replica] = Set.empty[Replica]
-  private val assignedReplicaMap = new Pool[Int,Replica]
+  private val assignedReplicaMap = new Pool[Int, Replica]
+  // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
   private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
-  private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+  @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+  @volatile var leaderReplicaIdOpt: Option[Int] = None
+  @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
   /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
    * One way of doing that is through the controller's start replica state change command. When a new broker starts up
    * the controller sends it a start replica command containing the leader for each partition that the broker hosts.
@@ -58,7 +59,6 @@ class Partition(val topic: String,
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
-  private val stateChangeLogger = KafkaController.stateChangeLogger
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
@@ -72,13 +72,11 @@ class Partition(val topic: String,
   )
 
   def isUnderReplicated(): Boolean = {
-    inReadLock(leaderIsrUpdateLock) {
-      leaderReplicaIfLocal() match {
-        case Some(_) =>
-          inSyncReplicas.size < assignedReplicas.size
-        case None =>
-          false
-      }
+    leaderReplicaIfLocal() match {
+      case Some(_) =>
+        inSyncReplicas.size < assignedReplicas.size
+      case None =>
+        false
     }
   }
 
@@ -114,15 +112,13 @@ class Partition(val topic: String,
   }
 
   def leaderReplicaIfLocal(): Option[Replica] = {
-    inReadLock(leaderIsrUpdateLock) {
-      leaderReplicaIdOpt match {
-        case Some(leaderReplicaId) =>
-          if (leaderReplicaId == localBrokerId)
-            getReplica(localBrokerId)
-          else
-            None
-        case None => None
-      }
+    leaderReplicaIdOpt match {
+      case Some(leaderReplicaId) =>
+        if (leaderReplicaId == localBrokerId)
+          getReplica(localBrokerId)
+        else
+          None
+      case None => None
     }
   }
 
@@ -155,9 +151,7 @@ class Partition(val topic: String,
   }
 
   def getLeaderEpoch(): Int = {
-    inReadLock(leaderIsrUpdateLock) {
-      return this.leaderEpoch
-    }
+    return this.leaderEpoch
   }
 
   /**
@@ -179,14 +173,17 @@ class Partition(val topic: String,
       val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
-      // reset LogEndOffset for remote replicas
-      assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
       inSyncReplicas = newInSyncReplicas
       leaderEpoch = leaderAndIsr.leaderEpoch
       zkVersion = leaderAndIsr.zkVersion
       leaderReplicaIdOpt = Some(localBrokerId)
+      // construct the high watermark metadata for the new leader replica
+      val newLeaderReplica = getReplica().get
+      newLeaderReplica.convertHWToLocalOffsetMetadata()
+      // reset log end offset for remote replicas
+      assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata)
       // we may need to increment high watermark since ISR could be down to 1
-      maybeIncrementLeaderHW(getReplica().get)
+      maybeIncrementLeaderHW(newLeaderReplica)
       if (topic == OffsetManager.OffsetsTopicName)
         offsetManager.loadOffsetsFromLog(partitionId)
       true
@@ -233,18 +230,8 @@ class Partition(val topic: String,
     }
   }
 
-  def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
+  def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) {
     inWriteLock(leaderIsrUpdateLock) {
-      debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
-      val replicaOpt = getReplica(replicaId)
-      if(!replicaOpt.isDefined) {
-        throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" +
-          " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
-            offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
-      }
-      val replica = replicaOpt.get
-      replica.logEndOffset = offset
-
       // check if this replica needs to be added to the ISR
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
@@ -253,8 +240,10 @@ class Partition(val topic: String,
           // For a replica to get added back to ISR, it has to satisfy 3 conditions-
           // 1. It is not already in the ISR
           // 2. It is part of the assigned replica list. See KAFKA-1097
-          // 3. It's log end offset >= leader's highwatermark
-          if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) {
+          // 3. It's log end offset >= leader's high watermark
+          if (!inSyncReplicas.contains(replica) &&
+            assignedReplicas.map(_.brokerId).contains(replicaId) &&
+            replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
             // expand ISR
             val newInSyncReplicas = inSyncReplicas + replica
             info("Expanding ISR for partition [%s,%d] from %s to %s"
@@ -270,29 +259,29 @@ class Partition(val topic: String,
   }
 
   def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
-    inReadLock(leaderIsrUpdateLock) {
-      leaderReplicaIfLocal() match {
-        case Some(_) =>
-          val numAcks = inSyncReplicas.count(r => {
-            if (!r.isLocal)
-              r.logEndOffset >= requiredOffset
-            else
-              true /* also count the local (leader) replica */
-          })
-          trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
-          if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
-            (requiredAcks > 0 && numAcks >= requiredAcks)) {
-            /*
-            * requiredAcks < 0 means acknowledge after all replicas in ISR
-            * are fully caught up to the (local) leader's offset
-            * corresponding to this produce request.
-            */
-            (true, ErrorMapping.NoError)
-          } else
-            (false, ErrorMapping.NoError)
-        case None =>
-          (false, ErrorMapping.NotLeaderForPartitionCode)
-      }
+    leaderReplicaIfLocal() match {
+      case Some(leaderReplica) =>
+        // keep the current immutable replica list reference
+        val curInSyncReplicas = inSyncReplicas
+        val numAcks = curInSyncReplicas.count(r => {
+          if (!r.isLocal)
+            r.logEndOffset.messageOffset >= requiredOffset
+          else
+            true /* also count the local (leader) replica */
+        })
+        trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
+        if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) ||
+          (requiredAcks > 0 && numAcks >= requiredAcks)) {
+          /*
+          * requiredAcks < 0 means acknowledge after all replicas in ISR
+          * are fully caught up to the (local) leader's offset
+          * corresponding to this produce request.
+          */
+          (true, ErrorMapping.NoError)
+        } else
+          (false, ErrorMapping.NoError)
+      case None =>
+        (false, ErrorMapping.NotLeaderForPartitionCode)
     }
   }
 
@@ -302,15 +291,19 @@ class Partition(val topic: String,
    */
   private def maybeIncrementLeaderHW(leaderReplica: Replica) {
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
-    val newHighWatermark = allLogEndOffsets.min
+    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderReplica.highWatermark
-    if(newHighWatermark > oldHighWatermark) {
+    if(oldHighWatermark.precedes(newHighWatermark)) {
       leaderReplica.highWatermark = newHighWatermark
-      debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark))
+      debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
+      // some delayed requests may be unblocked after HW changed
+      val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId)
+      replicaManager.unblockDelayedFetchRequests(requestKey)
+      replicaManager.unblockDelayedProduceRequests(requestKey)
+    } else {
+      debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
+        .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
     }
-    else
-      debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s"
-        .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
   }
 
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagMessages: Long) {
@@ -349,7 +342,9 @@ class Partition(val topic: String,
     if(stuckReplicas.size > 0)
       debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
     // Case 2 above
-    val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
+    val slowReplicas = candidateReplicas.filter(r =>
+      r.logEndOffset.messageOffset >= 0 &&
+      leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
     if(slowReplicas.size > 0)
       debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
     stuckReplicas ++ slowReplicas
@@ -362,6 +357,8 @@ class Partition(val topic: String,
         case Some(leaderReplica) =>
           val log = leaderReplica.log.get
           val info = log.append(messages, assignOffsets = true)
+          // probably unblock some follower fetch requests since log end offset has been updated
+          replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
           maybeIncrementLeaderHW(leaderReplica)
           info
@@ -399,14 +396,12 @@ class Partition(val topic: String,
   }
 
   override def toString(): String = {
-    inReadLock(leaderIsrUpdateLock) {
-      val partitionString = new StringBuilder
-      partitionString.append("Topic: " + topic)
-      partitionString.append("; Partition: " + partitionId)
-      partitionString.append("; Leader: " + leaderReplicaIdOpt)
-      partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
-      partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
-      partitionString.toString()
-    }
+    val partitionString = new StringBuilder
+    partitionString.append("Topic: " + topic)
+    partitionString.append("; Partition: " + partitionId)
+    partitionString.append("; Leader: " + leaderReplicaIdOpt)
+    partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+    partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+    partitionString.toString()
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 5e659b4..bd13c20 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -19,8 +19,9 @@ package kafka.cluster
 
 import kafka.log.Log
 import kafka.utils.{SystemTime, Time, Logging}
+import kafka.server.LogOffsetMetadata
 import kafka.common.KafkaException
-import kafka.server.ReplicaManager
+
 import java.util.concurrent.atomic.AtomicLong
 
 class Replica(val brokerId: Int,
@@ -28,33 +29,17 @@ class Replica(val brokerId: Int,
               time: Time = SystemTime,
               initialHighWatermarkValue: Long = 0L,
               val log: Option[Log] = None) extends Logging {
-  //only defined in local replica
-  private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue)
-  // only used for remote replica; logEndOffsetValue for local replica is kept in log
-  private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset)
-  private[this] var logEndOffsetUpdateTimeMsValue: AtomicLong = new AtomicLong(time.milliseconds)
+  // the high watermark offset value, in non-leader replicas only its message offsets are kept
+  @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
+  // the log end offset value, kept in all replicas;
+  // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
+  @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
+  // the time when log offset is updated
+  private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds)
+
   val topic = partition.topic
   val partitionId = partition.partitionId
 
-  def logEndOffset_=(newLogEndOffset: Long) {
-    if (!isLocal) {
-      logEndOffsetValue.set(newLogEndOffset)
-      logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
-      trace("Setting log end offset for replica %d for partition [%s,%d] to %d"
-            .format(brokerId, topic, partitionId, logEndOffsetValue.get()))
-    } else
-      throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d] since it's local"
-          .format(brokerId, topic, partitionId))
-
-  }
-
-  def logEndOffset = {
-    if (isLocal)
-      log.get.logEndOffset
-    else
-      logEndOffsetValue.get()
-  }
-  
   def isLocal: Boolean = {
     log match {
       case Some(l) => true
@@ -62,24 +47,43 @@ class Replica(val brokerId: Int,
     }
   }
 
-  def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
-
-  def highWatermark_=(newHighWatermark: Long) {
+  def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
     if (isLocal) {
-      trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d"
-              .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
-      highWatermarkValue.set(newHighWatermark)
-    } else
-      throw new KafkaException("Unable to set highwatermark for replica %d partition [%s,%d] since it's not local"
-              .format(brokerId, topic, partitionId))
+      throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId))
+    } else {
+      logEndOffsetMetadata = newLogEndOffset
+      logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
+      trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]"
+        .format(brokerId, topic, partitionId, logEndOffsetMetadata))
+    }
   }
 
-  def highWatermark = {
+  def logEndOffset =
     if (isLocal)
-      highWatermarkValue.get()
+      log.get.logEndOffsetMetadata
     else
-      throw new KafkaException("Unable to get highwatermark for replica %d partition [%s,%d] since it's not local"
-              .format(brokerId, topic, partitionId))
+      logEndOffsetMetadata
+
+  def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
+
+  def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
+    if (isLocal) {
+      highWatermarkMetadata = newHighWatermark
+      trace("Setting high watermark for replica %d partition [%s,%d] on broker %d to [%s]"
+        .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
+    } else {
+      throw new KafkaException("Should not set high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
+    }
+  }
+
+  def highWatermark = highWatermarkMetadata
+
+  def convertHWToLocalOffsetMetadata() = {
+    if (isLocal) {
+      highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+    } else {
+      throw new KafkaException("Should not construct complete high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
+    }
   }
 
   override def equals(that: Any): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 0e64632..8db9203 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -71,7 +71,7 @@ class SimpleConsumer(val host: String,
         response = blockingChannel.receive()
       } catch {
         case e : Throwable =>
-          info("Reconnect due to socket error: %s".format(e.getMessage))
+          info("Reconnect due to socket error: %s".format(e.toString))
           // retry once
           try {
             reconnect()

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b7bc5ff..0ddf97b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -21,7 +21,7 @@ import kafka.utils._
 import kafka.message._
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.BrokerTopicStats
+import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats}
 
 import java.io.{IOException, File}
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
@@ -51,11 +51,11 @@ import com.yammer.metrics.core.Gauge
 class Log(val dir: File,
           @volatile var config: LogConfig,
           @volatile var recoveryPoint: Long = 0L,
-          val scheduler: Scheduler,
+          scheduler: Scheduler,
           time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
-  
+
   /* A lock that guards all modifications to the log */
   private val lock = new Object
 
@@ -67,7 +67,7 @@ class Log(val dir: File,
   loadSegments()
   
   /* Calculate the offset of the next message */
-  private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
+  @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
 
   val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
 
@@ -167,6 +167,10 @@ class Log(val dir: File,
     for (s <- logSegments)
       s.index.sanityCheck()
   }
+
+  private def updateLogEndOffset(messageOffset: Long) {
+    nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
+  }
   
   private def recoverLog() {
     // if we have the clean shutdown marker, skip recovery
@@ -246,14 +250,14 @@ class Log(val dir: File,
     try {
       // they are valid, insert them in the log
       lock synchronized {
-        appendInfo.firstOffset = nextOffset.get
+        appendInfo.firstOffset = nextOffsetMetadata.messageOffset
 
         // maybe roll the log if this segment is full
         val segment = maybeRoll()
 
         if(assignOffsets) {
           // assign offsets to the message set
-          val offset = new AtomicLong(nextOffset.get)
+          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
           try {
             validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
           } catch {
@@ -262,7 +266,7 @@ class Log(val dir: File,
           appendInfo.lastOffset = offset.get - 1
         } else {
           // we are taking the offsets we are given
-          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
+          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
             throw new IllegalArgumentException("Out of order offsets found in " + messages)
         }
 
@@ -282,10 +286,10 @@ class Log(val dir: File,
         segment.append(appendInfo.firstOffset, validMessages)
 
         // increment the log end offset
-        nextOffset.set(appendInfo.lastOffset + 1)
+        updateLogEndOffset(appendInfo.lastOffset + 1)
 
         trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
-                .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages))
+                .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
 
         if(unflushedMessages >= config.flushInterval)
           flush()
@@ -307,7 +311,7 @@ class Log(val dir: File,
    * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
    */
   case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
-  
+
   /**
    * Validate the following:
    * <ol>
@@ -387,20 +391,21 @@ class Log(val dir: File,
 
   /**
    * Read messages from the log
+   *
    * @param startOffset The offset to begin reading at
    * @param maxLength The maximum number of bytes to read
    * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
    * 
    * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
-   * @return The messages read
+   * @return The fetch data information including fetch starting offset metadata and messages read
    */
-  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
+  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
     trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
 
     // check if the offset is valid and in range
-    val next = nextOffset.get
+    val next = nextOffsetMetadata.messageOffset
     if(startOffset == next)
-      return MessageSet.Empty
+      return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
     
     var entry = segments.floorEntry(startOffset)
       
@@ -412,15 +417,31 @@ class Log(val dir: File,
     // but if that segment doesn't contain any messages with an offset greater than that
     // continue to read from successive segments until we get some messages or we reach the end of the log
     while(entry != null) {
-      val messages = entry.getValue.read(startOffset, maxOffset, maxLength)
-      if(messages == null)
+      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
+      if(fetchInfo == null) {
         entry = segments.higherEntry(entry.getKey)
-      else
-        return messages
+      } else {
+        return fetchInfo
+      }
     }
     
-    // okay we are beyond the end of the last segment but less than the log end offset
-    MessageSet.Empty
+    // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
+    // this can happen when all messages with offset larger than start offsets have been deleted.
+    // In this case, we will return the empty set with log end offset metadata
+    FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
+  }
+
+  /**
+   * Given a message offset, find its corresponding offset metadata in the log.
+   * If the message offset is out of range, return unknown offset metadata
+   */
+  def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+    try {
+      val fetchDataInfo = read(offset, 1)
+      fetchDataInfo.fetchOffset
+    } catch {
+      case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+    }
   }
 
   /**
@@ -433,7 +454,7 @@ class Log(val dir: File,
     // find any segments that match the user-supplied predicate UNLESS it is the final segment 
     // and it is empty (since we would just end up re-creating it
     val lastSegment = activeSegment
-    var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
+    val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
     val numToDelete = deletable.size
     if(numToDelete > 0) {
       lock synchronized {
@@ -458,9 +479,14 @@ class Log(val dir: File,
   def logStartOffset: Long = logSegments.head.baseOffset
 
   /**
+   * The offset metadata of the next message that will be appended to the log
+   */
+  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+
+  /**
    *  The offset of the next message that will be appended to the log
    */
-  def logEndOffset: Long = nextOffset.get
+  def logEndOffset: Long = nextOffsetMetadata.messageOffset
 
   /**
    * Roll the log over to a new empty log segment if necessary
@@ -582,7 +608,7 @@ class Log(val dir: File,
         val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
         deletable.foreach(deleteSegment(_))
         activeSegment.truncateTo(targetOffset)
-        this.nextOffset.set(targetOffset)
+        updateLogEndOffset(targetOffset)
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
       }
     }
@@ -602,7 +628,7 @@ class Log(val dir: File,
                                 indexIntervalBytes = config.indexInterval, 
                                 maxIndexSize = config.maxIndexSize,
                                 time = time))
-      this.nextOffset.set(newOffset)
+      updateLogEndOffset(newOffset)
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index afbeffc..c20de4a 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -17,20 +17,22 @@
 
 package kafka.log
 
+import kafka.common._
+import kafka.message._
+import kafka.utils._
+import kafka.metrics.KafkaMetricsGroup
+
 import scala.collection._
 import scala.math
 import java.nio._
 import java.util.Date
 import java.io.File
-import kafka.common._
-import kafka.message._
-import kafka.utils._
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
 import java.lang.IllegalStateException
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
 
+import com.yammer.metrics.core.Gauge
+
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
  * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
@@ -325,7 +327,6 @@ private[log] class Cleaner(val id: Int,
    * @param log The log being cleaned
    * @param segments The group of segments being cleaned
    * @param map The offset map to use for cleaning segments
-   * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
    * @param deleteHorizonMs The time to retain delete tombstones
    */
   private[log] def cleanSegments(log: Log,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 0d6926e..7597d30 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -14,15 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package kafka.log
+package kafka.log
 
-import scala.math._
-import java.io.File
 import kafka.message._
 import kafka.common._
 import kafka.utils._
+import kafka.server.{LogOffsetMetadata, FetchDataInfo}
 
-/**
+import scala.math._
+import java.io.File
+
+
+ /**
  * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
  * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each 
  * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
@@ -86,7 +89,7 @@ class LogSegment(val log: FileMessageSet,
    * Find the physical file position for the first message with offset >= the requested offset.
    * 
    * The lowerBound argument is an optimization that can be used if we already know a valid starting position
-   * in the file higher than the greast-lower-bound from the index.
+   * in the file higher than the greatest-lower-bound from the index.
    * 
    * @param offset The offset we want to translate
    * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
@@ -99,7 +102,7 @@ class LogSegment(val log: FileMessageSet,
     val mapping = index.lookup(offset)
     log.searchFor(offset, max(mapping.position, startingFilePosition))
   }
-  
+
   /**
    * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
    * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
@@ -108,22 +111,27 @@ class LogSegment(val log: FileMessageSet,
    * @param maxSize The maximum number of bytes to include in the message set we read
    * @param maxOffset An optional maximum offset for the message set we read
    * 
-   * @return The message set read or null if the startOffset is larger than the largest offset in this log.
+   * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
+   *         or null if the startOffset is larger than the largest offset in this log
    */
   @threadsafe
-  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = {
+  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
     if(maxSize < 0)
       throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
-    if(maxSize == 0)
-      return MessageSet.Empty
-    
+
     val logSize = log.sizeInBytes // this may change, need to save a consistent copy
     val startPosition = translateOffset(startOffset)
-    
+
     // if the start position is already off the end of the log, return null
     if(startPosition == null)
       return null
-    
+
+    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
+
+    // if the size is zero, still return a log segment but with zero size
+    if(maxSize == 0)
+      return FetchDataInfo(offsetMetadata, MessageSet.Empty)
+
     // calculate the length of the message set to read based on whether or not they gave us a maxOffset
     val length = 
       maxOffset match {
@@ -143,7 +151,7 @@ class LogSegment(val log: FileMessageSet,
           min(endPosition - startPosition.position, maxSize) 
         }
       }
-    log.read(startPosition.position, length)
+    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
   }
   
   /**
@@ -222,7 +230,7 @@ class LogSegment(val log: FileMessageSet,
     if(ms == null) {
       baseOffset
     } else {
-      ms.lastOption match {
+      ms.messageSet.lastOption match {
         case None => baseOffset
         case Some(last) => last.nextOffset
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 3b15254..2e9532e 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,21 +18,22 @@
 package kafka.server
 
 import kafka.cluster.Broker
-import collection.mutable
-import scala.collection.Set
-import scala.collection.Map
-import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
 import kafka.utils.{Pool, ShutdownableThread}
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 import kafka.utils.Utils.inLock
+import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
+import kafka.metrics.KafkaMetricsGroup
+
+import scala.collection.mutable
+import scala.collection.Set
+import scala.collection.Map
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.atomic.AtomicLong
 
+import com.yammer.metrics.core.Gauge
 
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
@@ -92,12 +93,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     val partitionsWithError = new mutable.HashSet[TopicAndPartition]
     var response: FetchResponse = null
     try {
-      trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+      trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
       response = simpleConsumer.fetch(fetchRequest)
     } catch {
       case t: Throwable =>
         if (isRunning.get) {
-          warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.getMessage))
+          warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
           partitionMapLock synchronized {
             partitionsWithError ++= partitionMap.keys
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
new file mode 100644
index 0000000..e0f14e2
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.network.RequestChannel
+import kafka.api.{FetchResponse, FetchRequest}
+import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition}
+
+import scala.collection.immutable.Map
+import scala.collection.Seq
+
+/**
+ * A delayed fetch request, which is satisfied (or more
+ * accurately, unblocked) -- if:
+ * Case A: This broker is no longer the leader for some partitions it tries to fetch
+ *   - should return whatever data is available for the rest partitions.
+ * Case B: This broker is does not know of some partitions it tries to fetch
+ *   - should return whatever data is available for the rest partitions.
+ * Case C: The fetch offset locates not on the last segment of the log
+ *   - should return all the data on that segment.
+ * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+ *   - should return whatever data is available.
+ */
+
+class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey],
+                   override val request: RequestChannel.Request,
+                   override val delayMs: Long,
+                   val fetch: FetchRequest,
+                   private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata])
+  extends DelayedRequest(keys, request, delayMs) {
+
+  def isSatisfied(replicaManager: ReplicaManager) : Boolean = {
+    var accumulatedSize = 0
+    val fromFollower = fetch.isFromFollower
+    partitionFetchOffsets.foreach {
+      case (topicAndPartition, fetchOffset) =>
+        try {
+          if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
+            val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
+            val endOffset =
+              if (fromFollower)
+                replica.logEndOffset
+              else
+                replica.highWatermark
+
+            if (endOffset.offsetOnOlderSegment(fetchOffset)) {
+              // Case C, this can happen when the new follower replica fetching on a truncated leader
+              debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition))
+              return true
+            } else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
+              // Case C, this can happen when the folloer replica is lagging too much
+              debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch))
+              return true
+            } else if (fetchOffset.precedes(endOffset)) {
+              accumulatedSize += endOffset.positionDiff(fetchOffset)
+            }
+          }
+        } catch {
+          case utpe: UnknownTopicOrPartitionException => // Case A
+            debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch))
+            return true
+          case nle: NotLeaderForPartitionException =>  // Case B
+            debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch))
+            return true
+        }
+    }
+
+    // Case D
+    accumulatedSize >= fetch.minBytes
+  }
+
+  def respond(replicaManager: ReplicaManager): FetchResponse = {
+    val topicData = replicaManager.readMessageSets(fetch)
+    FetchResponse(fetch.correlationId, topicData.mapValues(_.data))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
new file mode 100644
index 0000000..9481508
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.api._
+import kafka.common.ErrorMapping
+import kafka.common.TopicAndPartition
+import kafka.utils.Logging
+import kafka.network.RequestChannel
+
+import scala.Some
+import scala.collection.immutable.Map
+import scala.collection.Seq
+
+/** A delayed produce request, which is satisfied (or more
+  * accurately, unblocked) -- if for every partition it produce to:
+  * Case A: This broker is not the leader: unblock - should return error.
+  * Case B: This broker is the leader:
+  *   B.1 - If there was a localError (when writing to the local log): unblock - should return error
+  *   B.2 - else, at least requiredAcks replicas should be caught up to this request.
+  */
+
+class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey],
+                     override val request: RequestChannel.Request,
+                     override val delayMs: Long,
+                     val produce: ProducerRequest,
+                     val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
+                     val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
+  extends DelayedRequest(keys, request, delayMs) with Logging {
+
+  // first update the acks pending variable according to the error code
+  partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
+    if (delayedStatus.responseStatus.error == ErrorMapping.NoError) {
+      // Timeout error state will be cleared when required acks are received
+      delayedStatus.acksPending = true
+      delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode
+    } else {
+      delayedStatus.acksPending = false
+    }
+
+    trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
+  }
+
+  def respond(offsetManager: OffsetManager): RequestOrResponse = {
+    val responseStatus = partitionStatus.mapValues(status => status.responseStatus)
+
+    val errorCode = responseStatus.find { case (_, status) =>
+      status.error != ErrorMapping.NoError
+    }.map(_._2.error).getOrElse(ErrorMapping.NoError)
+
+    if (errorCode == ErrorMapping.NoError) {
+      offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
+    }
+
+    val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize))
+      .getOrElse(ProducerResponse(produce.correlationId, responseStatus))
+
+    response
+  }
+
+  def isSatisfied(replicaManager: ReplicaManager) = {
+    // check for each partition if it still has pending acks
+    partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
+      trace("Checking producer request satisfaction for %s, acksPending = %b"
+        .format(topicAndPartition, fetchPartitionStatus.acksPending))
+      // skip those partitions that have already been satisfied
+      if (fetchPartitionStatus.acksPending) {
+        val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
+        val (hasEnough, errorCode) = partitionOpt match {
+          case Some(partition) =>
+            partition.checkEnoughReplicasReachOffset(
+              fetchPartitionStatus.requiredOffset,
+              produce.requiredAcks)
+          case None =>
+            (false, ErrorMapping.UnknownTopicOrPartitionCode)
+        }
+        if (errorCode != ErrorMapping.NoError) {
+          fetchPartitionStatus.acksPending = false
+          fetchPartitionStatus.responseStatus.error = errorCode
+        } else if (hasEnough) {
+          fetchPartitionStatus.acksPending = false
+          fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
+        }
+      }
+    }
+
+    // unblocked if there are no partitions with pending acks
+    val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
+    satisfied
+  }
+}
+
+case class DelayedProduceResponseStatus(val requiredOffset: Long,
+                                        val responseStatus: ProducerResponseStatus) {
+  @volatile var acksPending = false
+
+  override def toString =
+    "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
+      acksPending, responseStatus.error, responseStatus.offset, requiredOffset)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/DelayedRequestKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
new file mode 100644
index 0000000..628ef59
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.TopicAndPartition
+
+/**
+ * Keys used for delayed request metrics recording
+ */
+trait DelayedRequestKey {
+  def keyLabel: String
+}
+
+object DelayedRequestKey {
+  val globalLabel = "All"
+}
+
+case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
+
+  def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
+
+  override def keyLabel = "%s-%d".format(topic, partition)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/FetchDataInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala
new file mode 100644
index 0000000..26f278f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.message.MessageSet
+
+case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
new file mode 100644
index 0000000..ed13188
--- /dev/null
+++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.metrics.KafkaMetricsGroup
+import kafka.network.RequestChannel
+import kafka.api.FetchResponseSend
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * The purgatory holding delayed fetch requests
+ */
+class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel)
+  extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) {
+  this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
+
+  private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
+    private val metricPrefix = if (forFollower) "Follower" else "Consumer"
+
+    val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+  }
+
+  private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
+  private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
+
+  private def recordDelayedFetchExpired(forFollower: Boolean) {
+    val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+    else aggregateNonFollowerFetchRequestMetrics
+
+    metrics.expiredRequestMeter.mark()
+  }
+
+  /**
+   * Check if a specified delayed fetch request is satisfied
+   */
+  def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager)
+
+  /**
+   * When a delayed fetch request expires just answer it with whatever data is present
+   */
+  def expire(delayedFetch: DelayedFetch) {
+    debug("Expiring fetch request %s.".format(delayedFetch.fetch))
+    val fromFollower = delayedFetch.fetch.isFromFollower
+    recordDelayedFetchExpired(fromFollower)
+    respond(delayedFetch)
+  }
+
+  // TODO: purgatory should not be responsible for sending back the responses
+  def respond(delayedFetch: DelayedFetch) {
+    val response = delayedFetch.respond(replicaManager)
+    requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response)))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fd5f12e..bb94673 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -23,13 +23,10 @@ import kafka.log._
 import kafka.message._
 import kafka.network._
 import kafka.admin.AdminUtils
-import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel.Response
 import kafka.controller.KafkaController
-import kafka.utils.{Pool, SystemTime, Logging}
+import kafka.utils.{SystemTime, Logging}
 
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
 import scala.collection._
 
 import org.I0Itec.zkclient.ZkClient
@@ -45,11 +42,10 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val config: KafkaConfig,
                 val controller: KafkaController) extends Logging {
 
-  private val producerRequestPurgatory =
-    new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
-  private val fetchRequestPurgatory =
-    new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
-  private val delayedRequestMetrics = new DelayedRequestMetrics
+  val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel)
+  val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel)
+  // TODO: the following line will be removed in 0.9
+  replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory)
   var metadataCache = new MetadataCache
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
@@ -127,22 +123,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
   }
 
-  /**
-   * Check if a partitionData from a produce request can unblock any
-   * DelayedFetch requests.
-   */
-  def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) {
-    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes)
-    trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
-
-    // send any newly unblocked responses
-    for(fetchReq <- satisfied) {
-      val topicData = readMessageSets(fetchReq.fetch)
-      val response = FetchResponse(fetchReq.fetch.correlationId, topicData)
-      requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
-    }
-  }
-
   private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = {
     val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map {
       case (topicAndPartition, offset) =>
@@ -171,27 +151,21 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle a produce request or offset commit request (which is really a specialized producer request)
    */
   def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
-
-    val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) {
-      val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
-      (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
-    }
-    else {
-      (request.requestObj.asInstanceOf[ProducerRequest], None)
-    }
+    val (produceRequest, offsetCommitRequestOpt) =
+      if (request.requestId == RequestKeys.OffsetCommitKey) {
+        val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+        (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
+      } else {
+        (request.requestObj.asInstanceOf[ProducerRequest], None)
+      }
 
     val sTime = SystemTime.milliseconds
     val localProduceResults = appendToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+
     val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
 
     val numPartitionsInError = localProduceResults.count(_.error.isDefined)
-    produceRequest.data.foreach(partitionAndData =>
-      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes))
-
-    val allPartitionHaveReplicationFactorOne =
-      !produceRequest.data.keySet.exists(
-        m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
     if(produceRequest.requiredAcks == 0) {
       // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
       // no response is expected by the producer the handler will send a close connection response to the socket server
@@ -214,7 +188,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     } else if (produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
-        allPartitionHaveReplicationFactorOne ||
         numPartitionsInError == produceRequest.numPartitions) {
 
       if (firstErrorCode == ErrorMapping.NoError) {
@@ -229,46 +202,27 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val producerRequestKeys = produceRequest.data.keys.map(
-        topicAndPartition => new RequestKey(topicAndPartition)).toSeq
+        topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq
       val statuses = localProduceResults.map(r =>
         r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
       val delayedRequest =  new DelayedProduce(
         producerRequestKeys,
         request,
-        statuses,
-        produceRequest,
         produceRequest.ackTimeoutMs.toLong,
+        produceRequest,
+        statuses,
         offsetCommitRequestOpt)
 
-      producerRequestPurgatory.watch(delayedRequest)
-
-      /*
-       * Replica fetch requests may have arrived (and potentially satisfied)
-       * delayedProduce requests while they were being added to the purgatory.
-       * Here, we explicitly check if any of them can be satisfied.
-       */
-      var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
-      producerRequestKeys.foreach(key =>
-        satisfiedProduceRequests ++=
-          producerRequestPurgatory.update(key, key))
-      debug(satisfiedProduceRequests.size +
-        " producer requests unblocked during produce to local log.")
-      satisfiedProduceRequests.foreach(_.respond())
-
-      // we do not need the data anymore
-      produceRequest.emptyData()
+      // add the produce request for watch if it's not satisfied, otherwise send the response back
+      val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
+      if (satisfiedByMe)
+        producerRequestPurgatory.respond(delayedRequest)
     }
-  }
-
-  case class DelayedProduceResponseStatus(requiredOffset: Long,
-                                          status: ProducerResponseStatus) {
-    var acksPending = false
 
-    override def toString =
-      "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
-        acksPending, status.error, status.offset, requiredOffset)
+    // we do not need the data anymore
+    produceRequest.emptyData()
   }
-  
+
   case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
     def this(key: TopicAndPartition, throwable: Throwable) = 
       this(key, -1L, -1L, Some(throwable))
@@ -288,13 +242,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     partitionAndData.map {case (topicAndPartition, messages) =>
       try {
         val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
-        val info =
-          partitionOpt match {
-            case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
-            case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
-              .format(topicAndPartition, brokerId))
-
-          }
+        val info = partitionOpt match {
+          case Some(partition) =>
+            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
+            .format(topicAndPartition, brokerId))
+        }
 
         val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
 
@@ -338,121 +291,58 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
-    if(fetchRequest.isFromFollower) {
-      maybeUpdatePartitionHw(fetchRequest)
-      // after updating HW, some delayed produce requests may be unblocked
-      var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
-      fetchRequest.requestInfo.foreach {
-        case (topicAndPartition, _) =>
-          val key = new RequestKey(topicAndPartition)
-          satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
-      }
-      debug("Replica %d fetch unblocked %d producer requests."
-        .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
-      satisfiedProduceRequests.foreach(_.respond())
-    }
-
-    val dataRead = readMessageSets(fetchRequest)
-    val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum
+    val dataRead = replicaManager.readMessageSets(fetchRequest)
+
+    // if the fetch request comes from the follower,
+    // update its corresponding log end offset
+    if(fetchRequest.isFromFollower)
+      recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))
+
+    // check if this fetch request can be satisfied right away
+    val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
+    val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
+      errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
+    // send the data immediately if 1) fetch request does not want to wait
+    //                              2) fetch request does not require any data
+    //                              3) has enough data to respond
+    //                              4) some error happens while reading data
     if(fetchRequest.maxWait <= 0 ||
+       fetchRequest.numPartitions <= 0 ||
        bytesReadable >= fetchRequest.minBytes ||
-       fetchRequest.numPartitions <= 0) {
+       errorReadingData) {
       debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
-        .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
-      val response = new FetchResponse(fetchRequest.correlationId, dataRead)
+        .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
+      val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
       debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
         fetchRequest.clientId))
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
-      val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable)
-      fetchRequestPurgatory.watch(delayedFetch)
+      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_))
+      val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
+        dataRead.mapValues(_.offset))
+
+      // add the fetch request for watch if it's not satisfied, otherwise send the response back
+      val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)
+      if (satisfiedByMe)
+        fetchRequestPurgatory.respond(delayedFetch)
     }
   }
 
-  private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) {
-    debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
-    fetchRequest.requestInfo.foreach(info => {
-      val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset)
-      replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
-    })
-  }
+  private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
+    debug("Record follower log end offsets: %s ".format(offsets))
+    offsets.foreach {
+      case (topicAndPartition, offset) =>
+        replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
+          topicAndPartition.partition, replicaId, offset)
 
-  /**
-   * Read from all the offset details given and return a map of
-   * (topic, partition) -> PartitionData
-   */
-  private def readMessageSets(fetchRequest: FetchRequest) = {
-    val isFetchFromFollower = fetchRequest.isFromFollower
-    fetchRequest.requestInfo.map
-    {
-      case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
-        val partitionData =
-          try {
-            val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
-            BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
-            BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
-            if (!isFetchFromFollower) {
-              new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
-            } else {
-              debug("Leader %d for partition [%s,%d] received fetch request from follower %d"
-                            .format(brokerId, topic, partition, fetchRequest.replicaId))
-              new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
-            }
-          } catch {
-            // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
-            // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
-            // for a partition it is the leader for
-            case utpe: UnknownTopicOrPartitionException =>
-              warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
-                   fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage))
-              new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
-            case nle: NotLeaderForPartitionException =>
-              warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
-                fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
-              new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
-            case t: Throwable =>
-              BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
-              BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
-              error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s"
-                    .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage))
-              new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
-          }
-        (TopicAndPartition(topic, partition), partitionData)
+        // for producer requests with ack > 1, we need to check
+        // if they can be unblocked after some follower's log end offsets have moved
+        replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition))
     }
   }
 
   /**
-   * Read from a single topic/partition at the given offset upto maxSize bytes
-   */
-  private def readMessageSet(topic: String, 
-                             partition: Int, 
-                             offset: Long,
-                             maxSize: Int, 
-                             fromReplicaId: Int): (MessageSet, Long) = {
-    // check if the current broker is the leader for the partitions
-    val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
-      replicaManager.getReplicaOrException(topic, partition)
-    else
-      replicaManager.getLeaderReplicaIfLocal(topic, partition)
-    trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
-    val maxOffsetOpt = 
-      if (Request.isValidBrokerId(fromReplicaId))
-        None
-      else
-        Some(localReplica.highWatermark)
-    val messages = localReplica.log match {
-      case Some(log) =>
-        log.read(offset, maxSize, maxOffsetOpt)
-      case None =>
-        error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId))
-        MessageSet.Empty
-    }
-    (messages, localReplica.highWatermark)
-  }
-
-  /**
    * Service the offset request API 
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
@@ -473,7 +363,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (!offsetRequest.isFromOrdinaryClient) {
             allOffsets
           } else {
-            val hw = localReplica.highWatermark
+            val hw = localReplica.highWatermark.messageOffset
             if (allOffsets.exists(_ > hw))
               hw +: allOffsets.dropWhile(_ > hw)
             else 
@@ -643,209 +533,5 @@ class KafkaApis(val requestChannel: RequestChannel,
     producerRequestPurgatory.shutdown()
     debug("Shut down complete.")
   }
-
-  private [kafka] trait MetricKey {
-    def keyLabel: String
-  }
-  private [kafka] object MetricKey {
-    val globalLabel = "All"
-  }
-
-  private [kafka] case class RequestKey(topic: String, partition: Int)
-          extends MetricKey {
-
-    def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
-
-    def topicAndPartition = TopicAndPartition(topic, partition)
-
-    override def keyLabel = "%s-%d".format(topic, partition)
-  }
-
-  /**
-   * A delayed fetch request
-   */
-  class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long)
-    extends DelayedRequest(keys, request, delayMs) {
-    val bytesAccumulated = new AtomicLong(initialSize)
-  }
-
-  /**
-   * A holding pen for fetch requests waiting to be satisfied
-   */
-  class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
-          extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) {
-    this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
-
-    /**
-     * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
-     */
-    def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = {
-      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
-      accumulatedSize >= delayedFetch.fetch.minBytes
-    }
-
-    /**
-     * When a request expires just answer it with whatever data is present
-     */
-    def expire(delayed: DelayedFetch) {
-      debug("Expiring fetch request %s.".format(delayed.fetch))
-      try {
-        val topicData = readMessageSets(delayed.fetch)
-        val response = FetchResponse(delayed.fetch.correlationId, topicData)
-        val fromFollower = delayed.fetch.isFromFollower
-        delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
-        requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
-      }
-      catch {
-        case e1: LeaderNotAvailableException =>
-          debug("Leader changed before fetch request %s expired.".format(delayed.fetch))
-        case e2: UnknownTopicOrPartitionException =>
-          debug("Replica went offline before fetch request %s expired.".format(delayed.fetch))
-      }
-    }
-  }
-
-  class DelayedProduce(keys: Seq[RequestKey],
-                       request: RequestChannel.Request,
-                       val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus],
-                       produce: ProducerRequest,
-                       delayMs: Long,
-                       offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
-          extends DelayedRequest(keys, request, delayMs) with Logging {
-
-    // first update the acks pending variable according to the error code
-    partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
-      if (delayedStatus.status.error == ErrorMapping.NoError) {
-        // Timeout error state will be cleared when requiredAcks are received
-        delayedStatus.acksPending = true
-        delayedStatus.status.error = ErrorMapping.RequestTimedOutCode
-      } else {
-        delayedStatus.acksPending = false
-      }
-
-      trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
-    }
-
-    def respond() {
-      val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) =>
-        topicAndPartition -> delayedStatus.status
-      }
-
-      val errorCode = responseStatus.find { case (_, status) =>
-        status.error != ErrorMapping.NoError
-      }.map(_._2.error).getOrElse(ErrorMapping.NoError)
-
-      if (errorCode == ErrorMapping.NoError) {
-        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
-      }
-
-      val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize))
-                                           .getOrElse(ProducerResponse(produce.correlationId, responseStatus))
-
-      requestChannel.sendResponse(new RequestChannel.Response(
-        request, new BoundedByteBufferSend(response)))
-    }
-
-    /**
-     * Returns true if this delayed produce request is satisfied (or more
-     * accurately, unblocked) -- this is the case if for every partition:
-     * Case A: This broker is not the leader: unblock - should return error.
-     * Case B: This broker is the leader:
-     *   B.1 - If there was a localError (when writing to the local log): unblock - should return error
-     *   B.2 - else, at least requiredAcks replicas should be caught up to this request.
-     *
-     * As partitions become acknowledged, we may be able to unblock
-     * DelayedFetchRequests that are pending on those partitions.
-     */
-    def isSatisfied(followerFetchRequestKey: RequestKey) = {
-      val topic = followerFetchRequestKey.topic
-      val partitionId = followerFetchRequestKey.partition
-      val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId))
-      trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
-        .format(topic, partitionId, fetchPartitionStatus.acksPending))
-      if (fetchPartitionStatus.acksPending) {
-        val partitionOpt = replicaManager.getPartition(topic, partitionId)
-        val (hasEnough, errorCode) = partitionOpt match {
-          case Some(partition) =>
-            partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks)
-          case None =>
-            (false, ErrorMapping.UnknownTopicOrPartitionCode)
-        }
-        if (errorCode != ErrorMapping.NoError) {
-          fetchPartitionStatus. acksPending = false
-          fetchPartitionStatus.status.error = errorCode
-        } else if (hasEnough) {
-          fetchPartitionStatus.acksPending = false
-          fetchPartitionStatus.status.error = ErrorMapping.NoError
-        }
-        if (!fetchPartitionStatus.acksPending) {
-          val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
-          maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes)
-        }
-      }
-
-      // unblocked if there are no partitions with pending acks
-      val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
-      trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
-      satisfied
-    }
-  }
-
-  /**
-   * A holding pen for produce requests waiting to be satisfied.
-   */
-  private [kafka] class ProducerRequestPurgatory(purgeInterval: Int)
-          extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) {
-    this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
-
-    protected def checkSatisfied(followerFetchRequestKey: RequestKey,
-                                 delayedProduce: DelayedProduce) =
-      delayedProduce.isSatisfied(followerFetchRequestKey)
-
-    /**
-     * Handle an expired delayed request
-     */
-    protected def expire(delayedProduce: DelayedProduce) {
-      for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
-        delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition))
-
-      delayedProduce.respond()
-    }
-  }
-
-  private class DelayedRequestMetrics {
-    private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
-      val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
-    }
-
-
-    private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
-      private val metricPrefix = if (forFollower) "Follower" else "Consumer"
-
-      val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
-    }
-
-    private val producerRequestMetricsForKey = {
-      val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
-      new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory))
-    }
-
-    private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
-
-    private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
-    private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
-
-    def recordDelayedProducerKeyExpired(key: MetricKey) {
-      val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
-      List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
-    }
-
-    def recordDelayedFetchExpired(forFollower: Boolean) {
-      val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
-        else aggregateNonFollowerFetchRequestMetrics
-      
-      metrics.expiredRequestMeter.mark()
-    }
-  }
 }