You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/08/06 06:28:12 UTC
[2/2] git commit: kafka-1430; Purgatory redesign;
patched by Guozhang Wang; reviewed by Jun Rao
kafka-1430; Purgatory redesign; patched by Guozhang Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0dc243b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0dc243b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0dc243b9
Branch: refs/heads/trunk
Commit: 0dc243b92a6ae1683795caf8222edc1b2bb49565
Parents: 7a67a72
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Tue Aug 5 21:27:57 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Aug 5 21:27:57 2014 -0700
----------------------------------------------------------------------
.../src/main/scala/kafka/api/FetchRequest.scala | 6 +-
.../main/scala/kafka/api/FetchResponse.scala | 3 +-
.../main/scala/kafka/cluster/Partition.scala | 161 ++++---
core/src/main/scala/kafka/cluster/Replica.scala | 80 ++--
.../scala/kafka/consumer/SimpleConsumer.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 74 ++-
core/src/main/scala/kafka/log/LogCleaner.scala | 13 +-
core/src/main/scala/kafka/log/LogSegment.scala | 38 +-
.../kafka/server/AbstractFetcherThread.scala | 17 +-
.../main/scala/kafka/server/DelayedFetch.scala | 91 ++++
.../scala/kafka/server/DelayedProduce.scala | 115 +++++
.../scala/kafka/server/DelayedRequestKey.scala | 38 ++
.../main/scala/kafka/server/FetchDataInfo.scala | 22 +
.../kafka/server/FetchRequestPurgatory.scala | 69 +++
.../src/main/scala/kafka/server/KafkaApis.scala | 446 +++----------------
.../scala/kafka/server/LogOffsetMetadata.scala | 87 ++++
.../main/scala/kafka/server/OffsetManager.scala | 29 +-
.../kafka/server/ProducerRequestPurgatory.scala | 69 +++
.../kafka/server/ReplicaFetcherThread.scala | 25 +-
.../scala/kafka/server/ReplicaManager.scala | 183 ++++++--
.../scala/kafka/server/RequestPurgatory.scala | 98 ++--
.../scala/kafka/tools/TestEndToEndLatency.scala | 23 +-
.../test/scala/other/kafka/StressTestLog.scala | 2 +-
.../kafka/integration/PrimitiveApiTest.scala | 10 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 4 +-
.../scala/unit/kafka/log/LogSegmentTest.scala | 16 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 26 +-
.../kafka/message/BaseMessageSetTestCases.scala | 2 +-
.../server/HighwatermarkPersistenceTest.scala | 38 +-
.../unit/kafka/server/ISRExpirationTest.scala | 8 +-
.../unit/kafka/server/LogRecoveryTest.scala | 10 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 4 +-
.../kafka/server/RequestPurgatoryTest.scala | 26 +-
.../unit/kafka/server/SimpleFetchTest.scala | 58 ++-
34 files changed, 1149 insertions(+), 744 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 55a5982..51cdccf 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -17,16 +17,16 @@
package kafka.api
-import java.nio.ByteBuffer
import kafka.utils.nonthreadsafe
import kafka.api.ApiUtils._
-import scala.collection.immutable.Map
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.ConsumerConfig
-import java.util.concurrent.atomic.AtomicInteger
import kafka.network.RequestChannel
import kafka.message.MessageSet
+import java.util.concurrent.atomic.AtomicInteger
+import java.nio.ByteBuffer
+import scala.collection.immutable.Map
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index d117f10..af93087 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -19,6 +19,7 @@ package kafka.api
import java.nio.ByteBuffer
import java.nio.channels.GatheringByteChannel
+
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.message.{MessageSet, ByteBufferMessageSet}
import kafka.network.{MultiSend, Send}
@@ -151,7 +152,7 @@ object FetchResponse {
case class FetchResponse(correlationId: Int,
- data: Map[TopicAndPartition, FetchResponsePartitionData]) {
+ data: Map[TopicAndPartition, FetchResponsePartitionData]) {
/**
* Partitions the data into a map of maps (one for each topic).
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 134aef9..ff106b4 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -21,7 +21,7 @@ import kafka.admin.AdminUtils
import kafka.utils._
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
-import kafka.server.{OffsetManager, ReplicaManager}
+import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
@@ -29,7 +29,8 @@ import kafka.message.ByteBufferMessageSet
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.Utils.{inReadLock,inWriteLock}
-import scala.collection._
+import scala.Some
+import scala.collection.immutable.Set
import com.yammer.metrics.core.Gauge
@@ -39,18 +40,18 @@ import com.yammer.metrics.core.Gauge
*/
class Partition(val topic: String,
val partitionId: Int,
- var replicationFactor: Int,
time: Time,
- val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
+ replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager
private val zkClient = replicaManager.zkClient
- var leaderReplicaIdOpt: Option[Int] = None
- var inSyncReplicas: Set[Replica] = Set.empty[Replica]
- private val assignedReplicaMap = new Pool[Int,Replica]
+ private val assignedReplicaMap = new Pool[Int, Replica]
+ // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
private var zkVersion: Int = LeaderAndIsr.initialZKVersion
- private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+ @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+ @volatile var leaderReplicaIdOpt: Option[Int] = None
+ @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
/* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
* One way of doing that is through the controller's start replica state change command. When a new broker starts up
* the controller sends it a start replica command containing the leader for each partition that the broker hosts.
@@ -58,7 +59,6 @@ class Partition(val topic: String,
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
- private val stateChangeLogger = KafkaController.stateChangeLogger
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@@ -72,13 +72,11 @@ class Partition(val topic: String,
)
def isUnderReplicated(): Boolean = {
- inReadLock(leaderIsrUpdateLock) {
- leaderReplicaIfLocal() match {
- case Some(_) =>
- inSyncReplicas.size < assignedReplicas.size
- case None =>
- false
- }
+ leaderReplicaIfLocal() match {
+ case Some(_) =>
+ inSyncReplicas.size < assignedReplicas.size
+ case None =>
+ false
}
}
@@ -114,15 +112,13 @@ class Partition(val topic: String,
}
def leaderReplicaIfLocal(): Option[Replica] = {
- inReadLock(leaderIsrUpdateLock) {
- leaderReplicaIdOpt match {
- case Some(leaderReplicaId) =>
- if (leaderReplicaId == localBrokerId)
- getReplica(localBrokerId)
- else
- None
- case None => None
- }
+ leaderReplicaIdOpt match {
+ case Some(leaderReplicaId) =>
+ if (leaderReplicaId == localBrokerId)
+ getReplica(localBrokerId)
+ else
+ None
+ case None => None
}
}
@@ -155,9 +151,7 @@ class Partition(val topic: String,
}
def getLeaderEpoch(): Int = {
- inReadLock(leaderIsrUpdateLock) {
- return this.leaderEpoch
- }
+ return this.leaderEpoch
}
/**
@@ -179,14 +173,17 @@ class Partition(val topic: String,
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
- // reset LogEndOffset for remote replicas
- assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
inSyncReplicas = newInSyncReplicas
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
+ // construct the high watermark metadata for the new leader replica
+ val newLeaderReplica = getReplica().get
+ newLeaderReplica.convertHWToLocalOffsetMetadata()
+ // reset log end offset for remote replicas
+ assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata)
// we may need to increment high watermark since ISR could be down to 1
- maybeIncrementLeaderHW(getReplica().get)
+ maybeIncrementLeaderHW(newLeaderReplica)
if (topic == OffsetManager.OffsetsTopicName)
offsetManager.loadOffsetsFromLog(partitionId)
true
@@ -233,18 +230,8 @@ class Partition(val topic: String,
}
}
- def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
+ def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) {
inWriteLock(leaderIsrUpdateLock) {
- debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
- val replicaOpt = getReplica(replicaId)
- if(!replicaOpt.isDefined) {
- throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" +
- " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
- offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
- }
- val replica = replicaOpt.get
- replica.logEndOffset = offset
-
// check if this replica needs to be added to the ISR
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
@@ -253,8 +240,10 @@ class Partition(val topic: String,
// For a replica to get added back to ISR, it has to satisfy 3 conditions-
// 1. It is not already in the ISR
// 2. It is part of the assigned replica list. See KAFKA-1097
- // 3. It's log end offset >= leader's highwatermark
- if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) {
+ // 3. It's log end offset >= leader's high watermark
+ if (!inSyncReplicas.contains(replica) &&
+ assignedReplicas.map(_.brokerId).contains(replicaId) &&
+ replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
// expand ISR
val newInSyncReplicas = inSyncReplicas + replica
info("Expanding ISR for partition [%s,%d] from %s to %s"
@@ -270,29 +259,29 @@ class Partition(val topic: String,
}
def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
- inReadLock(leaderIsrUpdateLock) {
- leaderReplicaIfLocal() match {
- case Some(_) =>
- val numAcks = inSyncReplicas.count(r => {
- if (!r.isLocal)
- r.logEndOffset >= requiredOffset
- else
- true /* also count the local (leader) replica */
- })
- trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
- if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
- (requiredAcks > 0 && numAcks >= requiredAcks)) {
- /*
- * requiredAcks < 0 means acknowledge after all replicas in ISR
- * are fully caught up to the (local) leader's offset
- * corresponding to this produce request.
- */
- (true, ErrorMapping.NoError)
- } else
- (false, ErrorMapping.NoError)
- case None =>
- (false, ErrorMapping.NotLeaderForPartitionCode)
- }
+ leaderReplicaIfLocal() match {
+ case Some(leaderReplica) =>
+ // keep the current immutable replica list reference
+ val curInSyncReplicas = inSyncReplicas
+ val numAcks = curInSyncReplicas.count(r => {
+ if (!r.isLocal)
+ r.logEndOffset.messageOffset >= requiredOffset
+ else
+ true /* also count the local (leader) replica */
+ })
+ trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
+ if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) ||
+ (requiredAcks > 0 && numAcks >= requiredAcks)) {
+ /*
+ * requiredAcks < 0 means acknowledge after all replicas in ISR
+ * are fully caught up to the (local) leader's offset
+ * corresponding to this produce request.
+ */
+ (true, ErrorMapping.NoError)
+ } else
+ (false, ErrorMapping.NoError)
+ case None =>
+ (false, ErrorMapping.NotLeaderForPartitionCode)
}
}
@@ -302,15 +291,19 @@ class Partition(val topic: String,
*/
private def maybeIncrementLeaderHW(leaderReplica: Replica) {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
- val newHighWatermark = allLogEndOffsets.min
+ val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
- if(newHighWatermark > oldHighWatermark) {
+ if(oldHighWatermark.precedes(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
- debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark))
+ debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
+ // some delayed requests may be unblocked after HW changed
+ val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId)
+ replicaManager.unblockDelayedFetchRequests(requestKey)
+ replicaManager.unblockDelayedProduceRequests(requestKey)
+ } else {
+ debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
+ .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
}
- else
- debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s"
- .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
@@ -349,7 +342,9 @@ class Partition(val topic: String,
if(stuckReplicas.size > 0)
debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
// Case 2 above
- val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
+ val slowReplicas = candidateReplicas.filter(r =>
+ r.logEndOffset.messageOffset >= 0 &&
+ leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
if(slowReplicas.size > 0)
debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
@@ -362,6 +357,8 @@ class Partition(val topic: String,
case Some(leaderReplica) =>
val log = leaderReplica.log.get
val info = log.append(messages, assignOffsets = true)
+ // probably unblock some follower fetch requests since log end offset has been updated
+ replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
@@ -399,14 +396,12 @@ class Partition(val topic: String,
}
override def toString(): String = {
- inReadLock(leaderIsrUpdateLock) {
- val partitionString = new StringBuilder
- partitionString.append("Topic: " + topic)
- partitionString.append("; Partition: " + partitionId)
- partitionString.append("; Leader: " + leaderReplicaIdOpt)
- partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
- partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
- partitionString.toString()
- }
+ val partitionString = new StringBuilder
+ partitionString.append("Topic: " + topic)
+ partitionString.append("; Partition: " + partitionId)
+ partitionString.append("; Leader: " + leaderReplicaIdOpt)
+ partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+ partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+ partitionString.toString()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 5e659b4..bd13c20 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -19,8 +19,9 @@ package kafka.cluster
import kafka.log.Log
import kafka.utils.{SystemTime, Time, Logging}
+import kafka.server.LogOffsetMetadata
import kafka.common.KafkaException
-import kafka.server.ReplicaManager
+
import java.util.concurrent.atomic.AtomicLong
class Replica(val brokerId: Int,
@@ -28,33 +29,17 @@ class Replica(val brokerId: Int,
time: Time = SystemTime,
initialHighWatermarkValue: Long = 0L,
val log: Option[Log] = None) extends Logging {
- //only defined in local replica
- private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue)
- // only used for remote replica; logEndOffsetValue for local replica is kept in log
- private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset)
- private[this] var logEndOffsetUpdateTimeMsValue: AtomicLong = new AtomicLong(time.milliseconds)
+ // the high watermark offset value, in non-leader replicas only its message offsets are kept
+ @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
+ // the log end offset value, kept in all replicas;
+ // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
+ @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
+ // the time when log offset is updated
+ private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds)
+
val topic = partition.topic
val partitionId = partition.partitionId
- def logEndOffset_=(newLogEndOffset: Long) {
- if (!isLocal) {
- logEndOffsetValue.set(newLogEndOffset)
- logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
- trace("Setting log end offset for replica %d for partition [%s,%d] to %d"
- .format(brokerId, topic, partitionId, logEndOffsetValue.get()))
- } else
- throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d] since it's local"
- .format(brokerId, topic, partitionId))
-
- }
-
- def logEndOffset = {
- if (isLocal)
- log.get.logEndOffset
- else
- logEndOffsetValue.get()
- }
-
def isLocal: Boolean = {
log match {
case Some(l) => true
@@ -62,24 +47,43 @@ class Replica(val brokerId: Int,
}
}
- def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
-
- def highWatermark_=(newHighWatermark: Long) {
+ def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
if (isLocal) {
- trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d"
- .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
- highWatermarkValue.set(newHighWatermark)
- } else
- throw new KafkaException("Unable to set highwatermark for replica %d partition [%s,%d] since it's not local"
- .format(brokerId, topic, partitionId))
+ throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId))
+ } else {
+ logEndOffsetMetadata = newLogEndOffset
+ logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
+ trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]"
+ .format(brokerId, topic, partitionId, logEndOffsetMetadata))
+ }
}
- def highWatermark = {
+ def logEndOffset =
if (isLocal)
- highWatermarkValue.get()
+ log.get.logEndOffsetMetadata
else
- throw new KafkaException("Unable to get highwatermark for replica %d partition [%s,%d] since it's not local"
- .format(brokerId, topic, partitionId))
+ logEndOffsetMetadata
+
+ def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
+
+ def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
+ if (isLocal) {
+ highWatermarkMetadata = newHighWatermark
+ trace("Setting high watermark for replica %d partition [%s,%d] on broker %d to [%s]"
+ .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
+ } else {
+ throw new KafkaException("Should not set high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
+ }
+ }
+
+ def highWatermark = highWatermarkMetadata
+
+ def convertHWToLocalOffsetMetadata() = {
+ if (isLocal) {
+ highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+ } else {
+ throw new KafkaException("Should not construct complete high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
+ }
}
override def equals(that: Any): Boolean = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 0e64632..8db9203 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -71,7 +71,7 @@ class SimpleConsumer(val host: String,
response = blockingChannel.receive()
} catch {
case e : Throwable =>
- info("Reconnect due to socket error: %s".format(e.getMessage))
+ info("Reconnect due to socket error: %s".format(e.toString))
// retry once
try {
reconnect()
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b7bc5ff..0ddf97b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -21,7 +21,7 @@ import kafka.utils._
import kafka.message._
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.BrokerTopicStats
+import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats}
import java.io.{IOException, File}
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
@@ -51,11 +51,11 @@ import com.yammer.metrics.core.Gauge
class Log(val dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
- val scheduler: Scheduler,
+ scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
-
+
/* A lock that guards all modifications to the log */
private val lock = new Object
@@ -67,7 +67,7 @@ class Log(val dir: File,
loadSegments()
/* Calculate the offset of the next message */
- private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
+ @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
@@ -167,6 +167,10 @@ class Log(val dir: File,
for (s <- logSegments)
s.index.sanityCheck()
}
+
+ private def updateLogEndOffset(messageOffset: Long) {
+ nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
+ }
private def recoverLog() {
// if we have the clean shutdown marker, skip recovery
@@ -246,14 +250,14 @@ class Log(val dir: File,
try {
// they are valid, insert them in the log
lock synchronized {
- appendInfo.firstOffset = nextOffset.get
+ appendInfo.firstOffset = nextOffsetMetadata.messageOffset
// maybe roll the log if this segment is full
val segment = maybeRoll()
if(assignOffsets) {
// assign offsets to the message set
- val offset = new AtomicLong(nextOffset.get)
+ val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
try {
validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
} catch {
@@ -262,7 +266,7 @@ class Log(val dir: File,
appendInfo.lastOffset = offset.get - 1
} else {
// we are taking the offsets we are given
- if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
+ if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
}
@@ -282,10 +286,10 @@ class Log(val dir: File,
segment.append(appendInfo.firstOffset, validMessages)
// increment the log end offset
- nextOffset.set(appendInfo.lastOffset + 1)
+ updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
- .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages))
+ .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
if(unflushedMessages >= config.flushInterval)
flush()
@@ -307,7 +311,7 @@ class Log(val dir: File,
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
*/
case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
-
+
/**
* Validate the following:
* <ol>
@@ -387,20 +391,21 @@ class Log(val dir: File,
/**
* Read messages from the log
+ *
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
*
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
- * @return The messages read
+ * @return The fetch data information including fetch starting offset metadata and messages read
*/
- def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
+ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
// check if the offset is valid and in range
- val next = nextOffset.get
+ val next = nextOffsetMetadata.messageOffset
if(startOffset == next)
- return MessageSet.Empty
+ return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
var entry = segments.floorEntry(startOffset)
@@ -412,15 +417,31 @@ class Log(val dir: File,
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while(entry != null) {
- val messages = entry.getValue.read(startOffset, maxOffset, maxLength)
- if(messages == null)
+ val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
+ if(fetchInfo == null) {
entry = segments.higherEntry(entry.getKey)
- else
- return messages
+ } else {
+ return fetchInfo
+ }
}
- // okay we are beyond the end of the last segment but less than the log end offset
- MessageSet.Empty
+ // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
+ // this can happen when all messages with offset larger than start offsets have been deleted.
+ // In this case, we will return the empty set with log end offset metadata
+ FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
+ }
+
+ /**
+ * Given a message offset, find its corresponding offset metadata in the log.
+ * If the message offset is out of range, return unknown offset metadata
+ */
+ def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+ try {
+ val fetchDataInfo = read(offset, 1)
+ fetchDataInfo.fetchOffset
+ } catch {
+ case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+ }
}
/**
@@ -433,7 +454,7 @@ class Log(val dir: File,
// find any segments that match the user-supplied predicate UNLESS it is the final segment
// and it is empty (since we would just end up re-creating it
val lastSegment = activeSegment
- var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
+ val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
val numToDelete = deletable.size
if(numToDelete > 0) {
lock synchronized {
@@ -458,9 +479,14 @@ class Log(val dir: File,
def logStartOffset: Long = logSegments.head.baseOffset
/**
+ * The offset metadata of the next message that will be appended to the log
+ */
+ def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+
+ /**
* The offset of the next message that will be appended to the log
*/
- def logEndOffset: Long = nextOffset.get
+ def logEndOffset: Long = nextOffsetMetadata.messageOffset
/**
* Roll the log over to a new empty log segment if necessary
@@ -582,7 +608,7 @@ class Log(val dir: File,
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
deletable.foreach(deleteSegment(_))
activeSegment.truncateTo(targetOffset)
- this.nextOffset.set(targetOffset)
+ updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
}
}
@@ -602,7 +628,7 @@ class Log(val dir: File,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
time = time))
- this.nextOffset.set(newOffset)
+ updateLogEndOffset(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index afbeffc..c20de4a 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -17,20 +17,22 @@
package kafka.log
+import kafka.common._
+import kafka.message._
+import kafka.utils._
+import kafka.metrics.KafkaMetricsGroup
+
import scala.collection._
import scala.math
import java.nio._
import java.util.Date
import java.io.File
-import kafka.common._
-import kafka.message._
-import kafka.utils._
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
import java.lang.IllegalStateException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
+import com.yammer.metrics.core.Gauge
+
/**
* The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
* A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
@@ -325,7 +327,6 @@ private[log] class Cleaner(val id: Int,
* @param log The log being cleaned
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
- * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
* @param deleteHorizonMs The time to retain delete tombstones
*/
private[log] def cleanSegments(log: Log,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 0d6926e..7597d30 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -14,15 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
- package kafka.log
+package kafka.log
-import scala.math._
-import java.io.File
import kafka.message._
import kafka.common._
import kafka.utils._
+import kafka.server.{LogOffsetMetadata, FetchDataInfo}
-/**
+import scala.math._
+import java.io.File
+
+
+ /**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
@@ -86,7 +89,7 @@ class LogSegment(val log: FileMessageSet,
* Find the physical file position for the first message with offset >= the requested offset.
*
* The lowerBound argument is an optimization that can be used if we already know a valid starting position
- * in the file higher than the greast-lower-bound from the index.
+ * in the file higher than the greatest-lower-bound from the index.
*
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
@@ -99,7 +102,7 @@ class LogSegment(val log: FileMessageSet,
val mapping = index.lookup(offset)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
-
+
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
@@ -108,22 +111,27 @@ class LogSegment(val log: FileMessageSet,
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
*
- * @return The message set read or null if the startOffset is larger than the largest offset in this log.
+ * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
+ * or null if the startOffset is larger than the largest offset in this log
*/
@threadsafe
- def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = {
+ def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
- if(maxSize == 0)
- return MessageSet.Empty
-
+
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
-
+
// if the start position is already off the end of the log, return null
if(startPosition == null)
return null
-
+
+ val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
+
+ // if the size is zero, still return a log segment but with zero size
+ if(maxSize == 0)
+ return FetchDataInfo(offsetMetadata, MessageSet.Empty)
+
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
@@ -143,7 +151,7 @@ class LogSegment(val log: FileMessageSet,
min(endPosition - startPosition.position, maxSize)
}
}
- log.read(startPosition.position, length)
+ FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
/**
@@ -222,7 +230,7 @@ class LogSegment(val log: FileMessageSet,
if(ms == null) {
baseOffset
} else {
- ms.lastOption match {
+ ms.messageSet.lastOption match {
case None => baseOffset
case Some(last) => last.nextOffset
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 3b15254..2e9532e 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,21 +18,22 @@
package kafka.server
import kafka.cluster.Broker
-import collection.mutable
-import scala.collection.Set
-import scala.collection.Map
-import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
import kafka.utils.{Pool, ShutdownableThread}
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import kafka.utils.Utils.inLock
+import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
+import kafka.metrics.KafkaMetricsGroup
+
+import scala.collection.mutable
+import scala.collection.Set
+import scala.collection.Map
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
+import com.yammer.metrics.core.Gauge
/**
* Abstract class for fetching data from multiple partitions from the same broker.
@@ -92,12 +93,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null
try {
- trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+ trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
response = simpleConsumer.fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning.get) {
- warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.getMessage))
+ warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
partitionMapLock synchronized {
partitionsWithError ++= partitionMap.keys
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
new file mode 100644
index 0000000..e0f14e2
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.network.RequestChannel
+import kafka.api.{FetchResponse, FetchRequest}
+import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition}
+
+import scala.collection.immutable.Map
+import scala.collection.Seq
+
+/**
+ * A delayed fetch request, which is satisfied (or more
+ * accurately, unblocked) -- if:
+ * Case A: This broker is no longer the leader for some partitions it tries to fetch
+ * - should return whatever data is available for the rest partitions.
+ * Case B: This broker is does not know of some partitions it tries to fetch
+ * - should return whatever data is available for the rest partitions.
+ * Case C: The fetch offset locates not on the last segment of the log
+ * - should return all the data on that segment.
+ * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+ * - should return whatever data is available.
+ */
+
+class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey],
+ override val request: RequestChannel.Request,
+ override val delayMs: Long,
+ val fetch: FetchRequest,
+ private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata])
+ extends DelayedRequest(keys, request, delayMs) {
+
+ def isSatisfied(replicaManager: ReplicaManager) : Boolean = {
+ var accumulatedSize = 0
+ val fromFollower = fetch.isFromFollower
+ partitionFetchOffsets.foreach {
+ case (topicAndPartition, fetchOffset) =>
+ try {
+ if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
+ val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
+ val endOffset =
+ if (fromFollower)
+ replica.logEndOffset
+ else
+ replica.highWatermark
+
+ if (endOffset.offsetOnOlderSegment(fetchOffset)) {
+ // Case C, this can happen when the new follower replica fetching on a truncated leader
+ debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition))
+ return true
+ } else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
+ // Case C, this can happen when the folloer replica is lagging too much
+ debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch))
+ return true
+ } else if (fetchOffset.precedes(endOffset)) {
+ accumulatedSize += endOffset.positionDiff(fetchOffset)
+ }
+ }
+ } catch {
+ case utpe: UnknownTopicOrPartitionException => // Case A
+ debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch))
+ return true
+ case nle: NotLeaderForPartitionException => // Case B
+ debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch))
+ return true
+ }
+ }
+
+ // Case D
+ accumulatedSize >= fetch.minBytes
+ }
+
+ def respond(replicaManager: ReplicaManager): FetchResponse = {
+ val topicData = replicaManager.readMessageSets(fetch)
+ FetchResponse(fetch.correlationId, topicData.mapValues(_.data))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
new file mode 100644
index 0000000..9481508
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.api._
+import kafka.common.ErrorMapping
+import kafka.common.TopicAndPartition
+import kafka.utils.Logging
+import kafka.network.RequestChannel
+
+import scala.Some
+import scala.collection.immutable.Map
+import scala.collection.Seq
+
+/** A delayed produce request, which is satisfied (or more
+ * accurately, unblocked) -- if for every partition it produce to:
+ * Case A: This broker is not the leader: unblock - should return error.
+ * Case B: This broker is the leader:
+ * B.1 - If there was a localError (when writing to the local log): unblock - should return error
+ * B.2 - else, at least requiredAcks replicas should be caught up to this request.
+ */
+
+class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey],
+ override val request: RequestChannel.Request,
+ override val delayMs: Long,
+ val produce: ProducerRequest,
+ val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
+ val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
+ extends DelayedRequest(keys, request, delayMs) with Logging {
+
+ // first update the acks pending variable according to the error code
+ partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
+ if (delayedStatus.responseStatus.error == ErrorMapping.NoError) {
+ // Timeout error state will be cleared when required acks are received
+ delayedStatus.acksPending = true
+ delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode
+ } else {
+ delayedStatus.acksPending = false
+ }
+
+ trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
+ }
+
+ def respond(offsetManager: OffsetManager): RequestOrResponse = {
+ val responseStatus = partitionStatus.mapValues(status => status.responseStatus)
+
+ val errorCode = responseStatus.find { case (_, status) =>
+ status.error != ErrorMapping.NoError
+ }.map(_._2.error).getOrElse(ErrorMapping.NoError)
+
+ if (errorCode == ErrorMapping.NoError) {
+ offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
+ }
+
+ val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize))
+ .getOrElse(ProducerResponse(produce.correlationId, responseStatus))
+
+ response
+ }
+
+ def isSatisfied(replicaManager: ReplicaManager) = {
+ // check for each partition if it still has pending acks
+ partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
+ trace("Checking producer request satisfaction for %s, acksPending = %b"
+ .format(topicAndPartition, fetchPartitionStatus.acksPending))
+ // skip those partitions that have already been satisfied
+ if (fetchPartitionStatus.acksPending) {
+ val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
+ val (hasEnough, errorCode) = partitionOpt match {
+ case Some(partition) =>
+ partition.checkEnoughReplicasReachOffset(
+ fetchPartitionStatus.requiredOffset,
+ produce.requiredAcks)
+ case None =>
+ (false, ErrorMapping.UnknownTopicOrPartitionCode)
+ }
+ if (errorCode != ErrorMapping.NoError) {
+ fetchPartitionStatus.acksPending = false
+ fetchPartitionStatus.responseStatus.error = errorCode
+ } else if (hasEnough) {
+ fetchPartitionStatus.acksPending = false
+ fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
+ }
+ }
+ }
+
+ // unblocked if there are no partitions with pending acks
+ val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
+ satisfied
+ }
+}
+
+case class DelayedProduceResponseStatus(val requiredOffset: Long,
+ val responseStatus: ProducerResponseStatus) {
+ @volatile var acksPending = false
+
+ override def toString =
+ "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
+ acksPending, responseStatus.error, responseStatus.offset, requiredOffset)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/DelayedRequestKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
new file mode 100644
index 0000000..628ef59
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.TopicAndPartition
+
+/**
+ * Keys used for delayed request metrics recording
+ */
+trait DelayedRequestKey {
+ def keyLabel: String
+}
+
+object DelayedRequestKey {
+ val globalLabel = "All"
+}
+
+case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
+
+ def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
+
+ override def keyLabel = "%s-%d".format(topic, partition)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/FetchDataInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala
new file mode 100644
index 0000000..26f278f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.message.MessageSet
+
+case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
new file mode 100644
index 0000000..ed13188
--- /dev/null
+++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.metrics.KafkaMetricsGroup
+import kafka.network.RequestChannel
+import kafka.api.FetchResponseSend
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * The purgatory holding delayed fetch requests
+ */
+class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel)
+ extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) {
+ this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
+
+ private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
+ private val metricPrefix = if (forFollower) "Follower" else "Consumer"
+
+ val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+ }
+
+ private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
+ private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
+
+ private def recordDelayedFetchExpired(forFollower: Boolean) {
+ val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+ else aggregateNonFollowerFetchRequestMetrics
+
+ metrics.expiredRequestMeter.mark()
+ }
+
+ /**
+ * Check if a specified delayed fetch request is satisfied
+ */
+ def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager)
+
+ /**
+ * When a delayed fetch request expires just answer it with whatever data is present
+ */
+ def expire(delayedFetch: DelayedFetch) {
+ debug("Expiring fetch request %s.".format(delayedFetch.fetch))
+ val fromFollower = delayedFetch.fetch.isFromFollower
+ recordDelayedFetchExpired(fromFollower)
+ respond(delayedFetch)
+ }
+
+ // TODO: purgatory should not be responsible for sending back the responses
+ def respond(delayedFetch: DelayedFetch) {
+ val response = delayedFetch.respond(replicaManager)
+ requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response)))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fd5f12e..bb94673 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -23,13 +23,10 @@ import kafka.log._
import kafka.message._
import kafka.network._
import kafka.admin.AdminUtils
-import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.Response
import kafka.controller.KafkaController
-import kafka.utils.{Pool, SystemTime, Logging}
+import kafka.utils.{SystemTime, Logging}
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
import scala.collection._
import org.I0Itec.zkclient.ZkClient
@@ -45,11 +42,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val config: KafkaConfig,
val controller: KafkaController) extends Logging {
- private val producerRequestPurgatory =
- new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
- private val fetchRequestPurgatory =
- new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
- private val delayedRequestMetrics = new DelayedRequestMetrics
+ val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel)
+ val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel)
+ // TODO: the following line will be removed in 0.9
+ replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory)
var metadataCache = new MetadataCache
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
@@ -127,22 +123,6 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
}
- /**
- * Check if a partitionData from a produce request can unblock any
- * DelayedFetch requests.
- */
- def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) {
- val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes)
- trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
-
- // send any newly unblocked responses
- for(fetchReq <- satisfied) {
- val topicData = readMessageSets(fetchReq.fetch)
- val response = FetchResponse(fetchReq.fetch.correlationId, topicData)
- requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
- }
- }
-
private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = {
val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map {
case (topicAndPartition, offset) =>
@@ -171,27 +151,21 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle a produce request or offset commit request (which is really a specialized producer request)
*/
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
-
- val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) {
- val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
- (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
- }
- else {
- (request.requestObj.asInstanceOf[ProducerRequest], None)
- }
+ val (produceRequest, offsetCommitRequestOpt) =
+ if (request.requestId == RequestKeys.OffsetCommitKey) {
+ val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
+ } else {
+ (request.requestObj.asInstanceOf[ProducerRequest], None)
+ }
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
- produceRequest.data.foreach(partitionAndData =>
- maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes))
-
- val allPartitionHaveReplicationFactorOne =
- !produceRequest.data.keySet.exists(
- m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
if(produceRequest.requiredAcks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
// no response is expected by the producer the handler will send a close connection response to the socket server
@@ -214,7 +188,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
- allPartitionHaveReplicationFactorOne ||
numPartitionsInError == produceRequest.numPartitions) {
if (firstErrorCode == ErrorMapping.NoError) {
@@ -229,46 +202,27 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.map(
- topicAndPartition => new RequestKey(topicAndPartition)).toSeq
+ topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest = new DelayedProduce(
producerRequestKeys,
request,
- statuses,
- produceRequest,
produceRequest.ackTimeoutMs.toLong,
+ produceRequest,
+ statuses,
offsetCommitRequestOpt)
- producerRequestPurgatory.watch(delayedRequest)
-
- /*
- * Replica fetch requests may have arrived (and potentially satisfied)
- * delayedProduce requests while they were being added to the purgatory.
- * Here, we explicitly check if any of them can be satisfied.
- */
- var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
- producerRequestKeys.foreach(key =>
- satisfiedProduceRequests ++=
- producerRequestPurgatory.update(key, key))
- debug(satisfiedProduceRequests.size +
- " producer requests unblocked during produce to local log.")
- satisfiedProduceRequests.foreach(_.respond())
-
- // we do not need the data anymore
- produceRequest.emptyData()
+ // add the produce request for watch if it's not satisfied, otherwise send the response back
+ val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
+ if (satisfiedByMe)
+ producerRequestPurgatory.respond(delayedRequest)
}
- }
-
- case class DelayedProduceResponseStatus(requiredOffset: Long,
- status: ProducerResponseStatus) {
- var acksPending = false
- override def toString =
- "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
- acksPending, status.error, status.offset, requiredOffset)
+ // we do not need the data anymore
+ produceRequest.emptyData()
}
-
+
case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
def this(key: TopicAndPartition, throwable: Throwable) =
this(key, -1L, -1L, Some(throwable))
@@ -288,13 +242,12 @@ class KafkaApis(val requestChannel: RequestChannel,
partitionAndData.map {case (topicAndPartition, messages) =>
try {
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
- val info =
- partitionOpt match {
- case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
- case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
- .format(topicAndPartition, brokerId))
-
- }
+ val info = partitionOpt match {
+ case Some(partition) =>
+ partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+ case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
+ .format(topicAndPartition, brokerId))
+ }
val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
@@ -338,121 +291,58 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
- if(fetchRequest.isFromFollower) {
- maybeUpdatePartitionHw(fetchRequest)
- // after updating HW, some delayed produce requests may be unblocked
- var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
- fetchRequest.requestInfo.foreach {
- case (topicAndPartition, _) =>
- val key = new RequestKey(topicAndPartition)
- satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
- }
- debug("Replica %d fetch unblocked %d producer requests."
- .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
- satisfiedProduceRequests.foreach(_.respond())
- }
-
- val dataRead = readMessageSets(fetchRequest)
- val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum
+ val dataRead = replicaManager.readMessageSets(fetchRequest)
+
+ // if the fetch request comes from the follower,
+ // update its corresponding log end offset
+ if(fetchRequest.isFromFollower)
+ recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))
+
+ // check if this fetch request can be satisfied right away
+ val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
+ val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
+ errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
+ // send the data immediately if 1) fetch request does not want to wait
+ // 2) fetch request does not require any data
+ // 3) has enough data to respond
+ // 4) some error happens while reading data
if(fetchRequest.maxWait <= 0 ||
+ fetchRequest.numPartitions <= 0 ||
bytesReadable >= fetchRequest.minBytes ||
- fetchRequest.numPartitions <= 0) {
+ errorReadingData) {
debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
- .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
- val response = new FetchResponse(fetchRequest.correlationId, dataRead)
+ .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
+ val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
fetchRequest.clientId))
// create a list of (topic, partition) pairs to use as keys for this delayed request
- val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
- val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable)
- fetchRequestPurgatory.watch(delayedFetch)
+ val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_))
+ val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
+ dataRead.mapValues(_.offset))
+
+ // add the fetch request for watch if it's not satisfied, otherwise send the response back
+ val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)
+ if (satisfiedByMe)
+ fetchRequestPurgatory.respond(delayedFetch)
}
}
- private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) {
- debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
- fetchRequest.requestInfo.foreach(info => {
- val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset)
- replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
- })
- }
+ private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
+ debug("Record follower log end offsets: %s ".format(offsets))
+ offsets.foreach {
+ case (topicAndPartition, offset) =>
+ replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
+ topicAndPartition.partition, replicaId, offset)
- /**
- * Read from all the offset details given and return a map of
- * (topic, partition) -> PartitionData
- */
- private def readMessageSets(fetchRequest: FetchRequest) = {
- val isFetchFromFollower = fetchRequest.isFromFollower
- fetchRequest.requestInfo.map
- {
- case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
- val partitionData =
- try {
- val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
- BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
- if (!isFetchFromFollower) {
- new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
- } else {
- debug("Leader %d for partition [%s,%d] received fetch request from follower %d"
- .format(brokerId, topic, partition, fetchRequest.replicaId))
- new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
- }
- } catch {
- // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
- // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
- // for a partition it is the leader for
- case utpe: UnknownTopicOrPartitionException =>
- warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
- fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage))
- new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
- case nle: NotLeaderForPartitionException =>
- warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
- fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
- new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
- case t: Throwable =>
- BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
- BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
- error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s"
- .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage))
- new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
- }
- (TopicAndPartition(topic, partition), partitionData)
+ // for producer requests with ack > 1, we need to check
+ // if they can be unblocked after some follower's log end offsets have moved
+ replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition))
}
}
/**
- * Read from a single topic/partition at the given offset upto maxSize bytes
- */
- private def readMessageSet(topic: String,
- partition: Int,
- offset: Long,
- maxSize: Int,
- fromReplicaId: Int): (MessageSet, Long) = {
- // check if the current broker is the leader for the partitions
- val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
- replicaManager.getReplicaOrException(topic, partition)
- else
- replicaManager.getLeaderReplicaIfLocal(topic, partition)
- trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
- val maxOffsetOpt =
- if (Request.isValidBrokerId(fromReplicaId))
- None
- else
- Some(localReplica.highWatermark)
- val messages = localReplica.log match {
- case Some(log) =>
- log.read(offset, maxSize, maxOffsetOpt)
- case None =>
- error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId))
- MessageSet.Empty
- }
- (messages, localReplica.highWatermark)
- }
-
- /**
* Service the offset request API
*/
def handleOffsetRequest(request: RequestChannel.Request) {
@@ -473,7 +363,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!offsetRequest.isFromOrdinaryClient) {
allOffsets
} else {
- val hw = localReplica.highWatermark
+ val hw = localReplica.highWatermark.messageOffset
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
else
@@ -643,209 +533,5 @@ class KafkaApis(val requestChannel: RequestChannel,
producerRequestPurgatory.shutdown()
debug("Shut down complete.")
}
-
- private [kafka] trait MetricKey {
- def keyLabel: String
- }
- private [kafka] object MetricKey {
- val globalLabel = "All"
- }
-
- private [kafka] case class RequestKey(topic: String, partition: Int)
- extends MetricKey {
-
- def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
-
- def topicAndPartition = TopicAndPartition(topic, partition)
-
- override def keyLabel = "%s-%d".format(topic, partition)
- }
-
- /**
- * A delayed fetch request
- */
- class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long)
- extends DelayedRequest(keys, request, delayMs) {
- val bytesAccumulated = new AtomicLong(initialSize)
- }
-
- /**
- * A holding pen for fetch requests waiting to be satisfied
- */
- class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
- extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) {
- this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
-
- /**
- * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
- */
- def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = {
- val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
- accumulatedSize >= delayedFetch.fetch.minBytes
- }
-
- /**
- * When a request expires just answer it with whatever data is present
- */
- def expire(delayed: DelayedFetch) {
- debug("Expiring fetch request %s.".format(delayed.fetch))
- try {
- val topicData = readMessageSets(delayed.fetch)
- val response = FetchResponse(delayed.fetch.correlationId, topicData)
- val fromFollower = delayed.fetch.isFromFollower
- delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
- requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
- }
- catch {
- case e1: LeaderNotAvailableException =>
- debug("Leader changed before fetch request %s expired.".format(delayed.fetch))
- case e2: UnknownTopicOrPartitionException =>
- debug("Replica went offline before fetch request %s expired.".format(delayed.fetch))
- }
- }
- }
-
- class DelayedProduce(keys: Seq[RequestKey],
- request: RequestChannel.Request,
- val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus],
- produce: ProducerRequest,
- delayMs: Long,
- offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
- extends DelayedRequest(keys, request, delayMs) with Logging {
-
- // first update the acks pending variable according to the error code
- partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
- if (delayedStatus.status.error == ErrorMapping.NoError) {
- // Timeout error state will be cleared when requiredAcks are received
- delayedStatus.acksPending = true
- delayedStatus.status.error = ErrorMapping.RequestTimedOutCode
- } else {
- delayedStatus.acksPending = false
- }
-
- trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
- }
-
- def respond() {
- val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) =>
- topicAndPartition -> delayedStatus.status
- }
-
- val errorCode = responseStatus.find { case (_, status) =>
- status.error != ErrorMapping.NoError
- }.map(_._2.error).getOrElse(ErrorMapping.NoError)
-
- if (errorCode == ErrorMapping.NoError) {
- offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
- }
-
- val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize))
- .getOrElse(ProducerResponse(produce.correlationId, responseStatus))
-
- requestChannel.sendResponse(new RequestChannel.Response(
- request, new BoundedByteBufferSend(response)))
- }
-
- /**
- * Returns true if this delayed produce request is satisfied (or more
- * accurately, unblocked) -- this is the case if for every partition:
- * Case A: This broker is not the leader: unblock - should return error.
- * Case B: This broker is the leader:
- * B.1 - If there was a localError (when writing to the local log): unblock - should return error
- * B.2 - else, at least requiredAcks replicas should be caught up to this request.
- *
- * As partitions become acknowledged, we may be able to unblock
- * DelayedFetchRequests that are pending on those partitions.
- */
- def isSatisfied(followerFetchRequestKey: RequestKey) = {
- val topic = followerFetchRequestKey.topic
- val partitionId = followerFetchRequestKey.partition
- val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId))
- trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
- .format(topic, partitionId, fetchPartitionStatus.acksPending))
- if (fetchPartitionStatus.acksPending) {
- val partitionOpt = replicaManager.getPartition(topic, partitionId)
- val (hasEnough, errorCode) = partitionOpt match {
- case Some(partition) =>
- partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks)
- case None =>
- (false, ErrorMapping.UnknownTopicOrPartitionCode)
- }
- if (errorCode != ErrorMapping.NoError) {
- fetchPartitionStatus. acksPending = false
- fetchPartitionStatus.status.error = errorCode
- } else if (hasEnough) {
- fetchPartitionStatus.acksPending = false
- fetchPartitionStatus.status.error = ErrorMapping.NoError
- }
- if (!fetchPartitionStatus.acksPending) {
- val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
- maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes)
- }
- }
-
- // unblocked if there are no partitions with pending acks
- val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
- trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
- satisfied
- }
- }
-
- /**
- * A holding pen for produce requests waiting to be satisfied.
- */
- private [kafka] class ProducerRequestPurgatory(purgeInterval: Int)
- extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) {
- this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
-
- protected def checkSatisfied(followerFetchRequestKey: RequestKey,
- delayedProduce: DelayedProduce) =
- delayedProduce.isSatisfied(followerFetchRequestKey)
-
- /**
- * Handle an expired delayed request
- */
- protected def expire(delayedProduce: DelayedProduce) {
- for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
- delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition))
-
- delayedProduce.respond()
- }
- }
-
- private class DelayedRequestMetrics {
- private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
- val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
- }
-
-
- private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
- private val metricPrefix = if (forFollower) "Follower" else "Consumer"
-
- val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
- }
-
- private val producerRequestMetricsForKey = {
- val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
- new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory))
- }
-
- private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
-
- private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
- private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
-
- def recordDelayedProducerKeyExpired(key: MetricKey) {
- val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
- List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
- }
-
- def recordDelayedFetchExpired(forFollower: Boolean) {
- val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
- else aggregateNonFollowerFetchRequestMetrics
-
- metrics.expiredRequestMeter.mark()
- }
- }
}