You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/20 10:10:37 UTC
kafka git commit: KAFKA-4313;
ISRs may thrash when replication quota is enabled
Repository: kafka
Updated Branches:
refs/heads/trunk 4c295a784 -> 24067e407
KAFKA-4313; ISRs may thrash when replication quota is enabled
Author: Jun Rao <ju...@gmail.com>
Reviewers: Ben Stopford <be...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #2043 from junrao/kafka-4313
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24067e40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24067e40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24067e40
Branch: refs/heads/trunk
Commit: 24067e40764d91e1a6b2d80be45407841bbb72a2
Parents: 4c295a7
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Oct 20 10:38:11 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Oct 20 10:45:31 2016 +0100
----------------------------------------------------------------------
.../kafka/server/AbstractFetcherThread.scala | 8 +++
.../main/scala/kafka/server/DelayedFetch.scala | 17 +++--
.../kafka/server/ReplicaFetcherThread.scala | 12 +++-
.../scala/kafka/server/ReplicaManager.scala | 30 ++++++--
.../kafka/server/ReplicaManagerQuotasTest.scala | 74 +++++++++++++++++---
.../unit/kafka/server/SimpleFetchTest.scala | 18 ++++-
6 files changed, 133 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2f2cb4b..30f5125 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -304,6 +304,14 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
}
+ def isReplicaInSync(topic: String, partitionId: Int): Boolean = {
+ val fetcherLagMetrics = stats.get(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+ if (fetcherLagMetrics != null)
+ fetcherLagMetrics.lag <= 0
+ else
+ false
+ }
+
def unregister(topic: String, partitionId: Int) {
val lagMetrics = stats.remove(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
if (lagMetrics != null) lagMetrics.unregister()
http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 4b17e81..2feeae8 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -42,6 +42,7 @@ case class FetchMetadata(fetchMinBytes: Int,
fetchOnlyLeader: Boolean,
fetchOnlyCommitted: Boolean,
isFromFollower: Boolean,
+ replicaId: Int,
fetchPartitionStatus: Seq[(TopicAndPartition, FetchPartitionStatus)]) {
override def toString = "[minBytes: " + fetchMinBytes + ", " +
@@ -97,7 +98,8 @@ class DelayedFetch(delayMs: Long,
// Case C, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
- if (!(quota.isThrottled(topicAndPartition) && quota.isQuotaExceeded()))
+ // We will not force complete the fetch request if a replica should be throttled.
+ if (!replicaManager.shouldLeaderThrottle(quota, topicAndPartition, fetchMetadata.replicaId))
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
@@ -139,12 +141,13 @@ class DelayedFetch(delayMs: Long,
*/
override def onComplete() {
val logReadResults = replicaManager.readFromLocalLog(
- fetchMetadata.fetchOnlyLeader,
- fetchMetadata.fetchOnlyCommitted,
- fetchMetadata.fetchMaxBytes,
- fetchMetadata.hardMaxBytesLimit,
- fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
- quota
+ replicaId = fetchMetadata.replicaId,
+ fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader,
+ readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
+ fetchMaxBytes = fetchMetadata.fetchMaxBytes,
+ hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
+ readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
+ quota = quota
)
val fetchPartitionData = logReadResults.map { case (tp, result) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 7930716..6f4c589 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -286,10 +286,10 @@ class ReplicaFetcherThread(name: String,
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
- val quotaExceeded = quota.isQuotaExceeded
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition)
- if (partitionFetchState.isActive && !(quota.isThrottled(topicAndPartition) && quotaExceeded))
+ // We will not include a replica in the fetch request if it should be throttled.
+ if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicAndPartition))
requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
}
@@ -300,6 +300,14 @@ class ReplicaFetcherThread(name: String,
new FetchRequest(request)
}
+ /**
+ * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+ * the quota is exceeded and the replica is not in sync.
+ */
+ private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition): Boolean = {
+ val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition)
+ quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
+ }
}
object ReplicaFetcherThread {
http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2c843e8..32bc660 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -466,8 +466,14 @@ class ReplicaManager(val config: KafkaConfig,
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
// read from local logs
- val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchMaxBytes, hardMaxBytesLimit,
- fetchInfos, quota)
+ val logReadResults = readFromLocalLog(
+ replicaId = replicaId,
+ fetchOnlyFromLeader = fetchOnlyFromLeader,
+ readOnlyCommitted = fetchOnlyCommitted,
+ fetchMaxBytes = fetchMaxBytes,
+ hardMaxBytesLimit = hardMaxBytesLimit,
+ readPartitionInfo = fetchInfos,
+ quota = quota)
// if the fetch comes from the follower,
// update its corresponding log end offset
@@ -498,7 +504,7 @@ class ReplicaManager(val config: KafkaConfig,
(topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
}
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
- fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
+ fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
@@ -514,7 +520,8 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Read from multiple topic partitions at the given offset up to maxSize bytes
*/
- def readFromLocalLog(fetchOnlyFromLeader: Boolean,
+ def readFromLocalLog(replicaId: Int,
+ fetchOnlyFromLeader: Boolean,
readOnlyCommitted: Boolean,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
@@ -559,8 +566,8 @@ class ReplicaManager(val config: KafkaConfig,
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)
- // If the partition is marked as throttled, and we are over-quota then exclude it
- if (quota.isThrottled(tp) && quota.isQuotaExceeded)
+ // If the partition is being throttled, simply return an empty set.
+ if (shouldLeaderThrottle(quota, tp, replicaId))
FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
@@ -607,6 +614,17 @@ class ReplicaManager(val config: KafkaConfig,
result
}
+ /**
+ * To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list,
+ * the quota is exceeded and the replica is not in sync.
+ */
+ def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition, replicaId: Int): Boolean = {
+ val isReplicaInSync = getPartition(topicPartition.topic, topicPartition.partition).flatMap { partition =>
+ partition.getReplica(replicaId).map(partition.inSyncReplicas.contains)
+ }.getOrElse(false)
+ quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
+ }
+
def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] =
getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { replica =>
replica.log.map(_.config.messageFormatVersion.messageFormatVersion)
http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f8f8dda..17e0516 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -48,13 +48,21 @@ class ReplicaManagerQuotasTest {
@Test
def shouldExcludeSubsequentThrottledPartitions(): Unit = {
setUpMocks(fetchInfo)
+ val followerReplicaId = configs.last.brokerId
val quota = mockQuota(1000000)
expect(quota.isQuotaExceeded()).andReturn(false).once()
expect(quota.isQuotaExceeded()).andReturn(true).once()
replay(quota)
- val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
+ val fetch = replicaManager.readFromLocalLog(
+ replicaId = followerReplicaId,
+ fetchOnlyFromLeader = true,
+ readOnlyCommitted = true,
+ fetchMaxBytes = Int.MaxValue,
+ hardMaxBytesLimit = false,
+ readPartitionInfo = fetchInfo,
+ quota = quota)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
@@ -65,13 +73,21 @@ class ReplicaManagerQuotasTest {
@Test
def shouldGetNoMessagesIfQuotasExceededOnSubsequentPartitions(): Unit = {
setUpMocks(fetchInfo)
+ val followerReplicaId = configs.last.brokerId
val quota = mockQuota(1000000)
expect(quota.isQuotaExceeded()).andReturn(true).once()
expect(quota.isQuotaExceeded()).andReturn(true).once()
replay(quota)
- val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
+ val fetch = replicaManager.readFromLocalLog(
+ replicaId = followerReplicaId,
+ fetchOnlyFromLeader = true,
+ readOnlyCommitted = true,
+ fetchMaxBytes = Int.MaxValue,
+ hardMaxBytesLimit = false,
+ readPartitionInfo = fetchInfo,
+ quota = quota)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
@@ -81,20 +97,53 @@ class ReplicaManagerQuotasTest {
@Test
def shouldGetBothMessagesIfQuotasAllow(): Unit = {
setUpMocks(fetchInfo)
+ val followerReplicaId = configs.last.brokerId
val quota = mockQuota(1000000)
expect(quota.isQuotaExceeded()).andReturn(false).once()
expect(quota.isQuotaExceeded()).andReturn(false).once()
replay(quota)
- val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
+ val fetch = replicaManager.readFromLocalLog(
+ replicaId = followerReplicaId,
+ fetchOnlyFromLeader = true,
+ readOnlyCommitted = true,
+ fetchMaxBytes = Int.MaxValue,
+ hardMaxBytesLimit = false,
+ readPartitionInfo = fetchInfo,
+ quota = quota)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
}
- def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message) {
+ @Test
+ def shouldIncludeInSyncThrottledReplicas(): Unit = {
+ setUpMocks(fetchInfo, bothReplicasInSync = true)
+ val followerReplicaId = configs.last.brokerId
+
+ val quota = mockQuota(1000000)
+ expect(quota.isQuotaExceeded()).andReturn(false).once()
+ expect(quota.isQuotaExceeded()).andReturn(true).once()
+ replay(quota)
+
+ val fetch = replicaManager.readFromLocalLog(
+ replicaId = followerReplicaId,
+ fetchOnlyFromLeader = true,
+ readOnlyCommitted = true,
+ fetchMaxBytes = Int.MaxValue,
+ hardMaxBytesLimit = false,
+ readPartitionInfo = fetchInfo,
+ quota = quota)
+ assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
+ fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
+
+ assertEquals("But we should get the second too since it's throttled but in sync", 1,
+ fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
+ }
+
+ def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message, bothReplicasInSync: Boolean = false) {
val zkUtils = createNiceMock(classOf[ZkUtils])
val scheduler = createNiceMock(classOf[KafkaScheduler])
@@ -131,12 +180,19 @@ class ReplicaManagerQuotasTest {
//create the two replicas
for ((p, _) <- fetchInfo) {
val partition = replicaManager.getOrCreatePartition(p.topic, p.partition)
- val replica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
- replica.highWatermark = new LogOffsetMetadata(5)
- partition.leaderReplicaIdOpt = Some(replica.brokerId)
- val allReplicas = List(replica)
+ val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
+ leaderReplica.highWatermark = new LogOffsetMetadata(5)
+ partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
+ val followerReplica = new Replica(configs.last.brokerId, partition, time, 0, Some(log))
+ val allReplicas = Set(leaderReplica, followerReplica)
allReplicas.foreach(partition.addReplicaIfNotExists(_))
- partition.inSyncReplicas = allReplicas.toSet
+ if (bothReplicasInSync) {
+ partition.inSyncReplicas = allReplicas
+ followerReplica.highWatermark = new LogOffsetMetadata(5)
+ } else {
+ partition.inSyncReplicas = Set(leaderReplica)
+ followerReplica.highWatermark = new LogOffsetMetadata(0)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 71c2b41..cbd751b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -149,9 +149,23 @@ class SimpleFetchTest {
val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW,
- replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
+ replicaManager.readFromLocalLog(
+ replicaId = Request.OrdinaryConsumerId,
+ fetchOnlyFromLeader = true,
+ readOnlyCommitted = true,
+ fetchMaxBytes = Int.MaxValue,
+ hardMaxBytesLimit = false,
+ readPartitionInfo = fetchInfo,
+ quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
- replicaManager.readFromLocalLog(true, false, Int.MaxValue, false, fetchInfo, UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
+ replicaManager.readFromLocalLog(
+ replicaId = Request.OrdinaryConsumerId,
+ fetchOnlyFromLeader = true,
+ readOnlyCommitted = false,
+ fetchMaxBytes = Int.MaxValue,
+ hardMaxBytesLimit = false,
+ readPartitionInfo = fetchInfo,
+ quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())