You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/20 10:10:37 UTC

kafka git commit: KAFKA-4313; ISRs may thrash when replication quota is enabled

Repository: kafka
Updated Branches:
  refs/heads/trunk 4c295a784 -> 24067e407


KAFKA-4313; ISRs may thrash when replication quota is enabled

Author: Jun Rao <ju...@gmail.com>

Reviewers: Ben Stopford <be...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #2043 from junrao/kafka-4313


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24067e40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24067e40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24067e40

Branch: refs/heads/trunk
Commit: 24067e40764d91e1a6b2d80be45407841bbb72a2
Parents: 4c295a7
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Oct 20 10:38:11 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Oct 20 10:45:31 2016 +0100

----------------------------------------------------------------------
 .../kafka/server/AbstractFetcherThread.scala    |  8 +++
 .../main/scala/kafka/server/DelayedFetch.scala  | 17 +++--
 .../kafka/server/ReplicaFetcherThread.scala     | 12 +++-
 .../scala/kafka/server/ReplicaManager.scala     | 30 ++++++--
 .../kafka/server/ReplicaManagerQuotasTest.scala | 74 +++++++++++++++++---
 .../unit/kafka/server/SimpleFetchTest.scala     | 18 ++++-
 6 files changed, 133 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2f2cb4b..30f5125 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -304,6 +304,14 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
     stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
   }
 
+  def isReplicaInSync(topic: String, partitionId: Int): Boolean = {
+    val fetcherLagMetrics = stats.get(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+    if (fetcherLagMetrics != null)
+      fetcherLagMetrics.lag <= 0
+    else
+      false
+  }
+
   def unregister(topic: String, partitionId: Int) {
     val lagMetrics = stats.remove(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
     if (lagMetrics != null) lagMetrics.unregister()

http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 4b17e81..2feeae8 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -42,6 +42,7 @@ case class FetchMetadata(fetchMinBytes: Int,
                          fetchOnlyLeader: Boolean,
                          fetchOnlyCommitted: Boolean,
                          isFromFollower: Boolean,
+                         replicaId: Int,
                          fetchPartitionStatus: Seq[(TopicAndPartition, FetchPartitionStatus)]) {
 
   override def toString = "[minBytes: " + fetchMinBytes + ", " +
@@ -97,7 +98,8 @@ class DelayedFetch(delayMs: Long,
                 // Case C, this can happen when the fetch operation is falling behind the current segment
                 // or the partition has just rolled a new segment
                 debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
-                if (!(quota.isThrottled(topicAndPartition) && quota.isQuotaExceeded()))
+                // We will not force complete the fetch request if a replica should be throttled.
+                if (!replicaManager.shouldLeaderThrottle(quota, topicAndPartition, fetchMetadata.replicaId))
                   return forceComplete()
               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                 // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
@@ -139,12 +141,13 @@ class DelayedFetch(delayMs: Long,
    */
   override def onComplete() {
     val logReadResults = replicaManager.readFromLocalLog(
-      fetchMetadata.fetchOnlyLeader,
-      fetchMetadata.fetchOnlyCommitted,
-      fetchMetadata.fetchMaxBytes,
-      fetchMetadata.hardMaxBytesLimit,
-      fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
-      quota
+      replicaId = fetchMetadata.replicaId,
+      fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader,
+      readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
+      fetchMaxBytes = fetchMetadata.fetchMaxBytes,
+      hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
+      readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
+      quota = quota
     )
 
     val fetchPartitionData = logReadResults.map { case (tp, result) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 7930716..6f4c589 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -286,10 +286,10 @@ class ReplicaFetcherThread(name: String,
   protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
     val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
 
-    val quotaExceeded = quota.isQuotaExceeded
     partitionMap.foreach { case (topicPartition, partitionFetchState) =>
       val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition)
-      if (partitionFetchState.isActive && !(quota.isThrottled(topicAndPartition) && quotaExceeded))
+      // We will not include a replica in the fetch request if it should be throttled.
+      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicAndPartition))
         requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
     }
 
@@ -300,6 +300,14 @@ class ReplicaFetcherThread(name: String,
     new FetchRequest(request)
   }
 
+  /**
+   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   *  the quota is exceeded and the replica is not in sync.
+   */
+  private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition): Boolean = {
+    val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition)
+    quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
+  }
 }
 
 object ReplicaFetcherThread {

http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2c843e8..32bc660 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -466,8 +466,14 @@ class ReplicaManager(val config: KafkaConfig,
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
 
     // read from local logs
-    val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchMaxBytes, hardMaxBytesLimit,
-      fetchInfos, quota)
+    val logReadResults = readFromLocalLog(
+      replicaId = replicaId,
+      fetchOnlyFromLeader = fetchOnlyFromLeader,
+      readOnlyCommitted = fetchOnlyCommitted,
+      fetchMaxBytes = fetchMaxBytes,
+      hardMaxBytesLimit = hardMaxBytesLimit,
+      readPartitionInfo = fetchInfos,
+      quota = quota)
 
     // if the fetch comes from the follower,
     // update its corresponding log end offset
@@ -498,7 +504,7 @@ class ReplicaManager(val config: KafkaConfig,
         (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
       }
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
-        fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
+        fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
@@ -514,7 +520,8 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Read from multiple topic partitions at the given offset up to maxSize bytes
    */
-  def readFromLocalLog(fetchOnlyFromLeader: Boolean,
+  def readFromLocalLog(replicaId: Int,
+                       fetchOnlyFromLeader: Boolean,
                        readOnlyCommitted: Boolean,
                        fetchMaxBytes: Int,
                        hardMaxBytesLimit: Boolean,
@@ -559,8 +566,8 @@ class ReplicaManager(val config: KafkaConfig,
             // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
             val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)
 
-            // If the partition is marked as throttled, and we are over-quota then exclude it
-            if (quota.isThrottled(tp) && quota.isQuotaExceeded)
+            // If the partition is being throttled, simply return an empty set.
+            if (shouldLeaderThrottle(quota, tp, replicaId))
               FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
             // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
             // progress in such cases and don't need to report a `RecordTooLargeException`
@@ -607,6 +614,17 @@ class ReplicaManager(val config: KafkaConfig,
     result
   }
 
+  /**
+   *  To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list,
+   *  the quota is exceeded and the replica is not in sync.
+   */
+  def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition, replicaId: Int): Boolean = {
+    val isReplicaInSync = getPartition(topicPartition.topic, topicPartition.partition).flatMap { partition =>
+      partition.getReplica(replicaId).map(partition.inSyncReplicas.contains)
+    }.getOrElse(false)
+    quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
+  }
+
   def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] =
     getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { replica =>
       replica.log.map(_.config.messageFormatVersion.messageFormatVersion)

http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f8f8dda..17e0516 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -48,13 +48,21 @@ class ReplicaManagerQuotasTest {
   @Test
   def shouldExcludeSubsequentThrottledPartitions(): Unit = {
     setUpMocks(fetchInfo)
+    val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
     expect(quota.isQuotaExceeded()).andReturn(false).once()
     expect(quota.isQuotaExceeded()).andReturn(true).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(
+      replicaId = followerReplicaId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
       fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
 
@@ -65,13 +73,21 @@ class ReplicaManagerQuotasTest {
   @Test
   def shouldGetNoMessagesIfQuotasExceededOnSubsequentPartitions(): Unit = {
     setUpMocks(fetchInfo)
+    val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
     expect(quota.isQuotaExceeded()).andReturn(true).once()
     expect(quota.isQuotaExceeded()).andReturn(true).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(
+      replicaId = followerReplicaId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = quota)
     assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
       fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
     assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
@@ -81,20 +97,53 @@ class ReplicaManagerQuotasTest {
   @Test
   def shouldGetBothMessagesIfQuotasAllow(): Unit = {
     setUpMocks(fetchInfo)
+    val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
     expect(quota.isQuotaExceeded()).andReturn(false).once()
     expect(quota.isQuotaExceeded()).andReturn(false).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(
+      replicaId = followerReplicaId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = quota)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
       fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
       fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
   }
 
-  def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message) {
+  @Test
+  def shouldIncludeInSyncThrottledReplicas(): Unit = {
+    setUpMocks(fetchInfo, bothReplicasInSync = true)
+    val followerReplicaId = configs.last.brokerId
+
+    val quota = mockQuota(1000000)
+    expect(quota.isQuotaExceeded()).andReturn(false).once()
+    expect(quota.isQuotaExceeded()).andReturn(true).once()
+    replay(quota)
+
+    val fetch = replicaManager.readFromLocalLog(
+      replicaId = followerReplicaId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = quota)
+    assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
+      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
+
+    assertEquals("But we should get the second too since it's throttled but in sync", 1,
+      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
+  }
+
+  def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], message: Message = this.message, bothReplicasInSync: Boolean = false) {
     val zkUtils = createNiceMock(classOf[ZkUtils])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
 
@@ -131,12 +180,19 @@ class ReplicaManagerQuotasTest {
     //create the two replicas
     for ((p, _) <- fetchInfo) {
       val partition = replicaManager.getOrCreatePartition(p.topic, p.partition)
-      val replica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
-      replica.highWatermark = new LogOffsetMetadata(5)
-      partition.leaderReplicaIdOpt = Some(replica.brokerId)
-      val allReplicas = List(replica)
+      val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
+      leaderReplica.highWatermark = new LogOffsetMetadata(5)
+      partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
+      val followerReplica = new Replica(configs.last.brokerId, partition, time, 0, Some(log))
+      val allReplicas = Set(leaderReplica, followerReplica)
       allReplicas.foreach(partition.addReplicaIfNotExists(_))
-      partition.inSyncReplicas = allReplicas.toSet
+      if (bothReplicasInSync) {
+        partition.inSyncReplicas = allReplicas
+        followerReplica.highWatermark = new LogOffsetMetadata(5)
+      } else {
+        partition.inSyncReplicas = Set(leaderReplica)
+        followerReplica.highWatermark = new LogOffsetMetadata(0)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/24067e40/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 71c2b41..cbd751b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -149,9 +149,23 @@ class SimpleFetchTest {
     val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
 
     assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW,
-      replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, fetchInfo, UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
+      replicaManager.readFromLocalLog(
+        replicaId = Request.OrdinaryConsumerId,
+        fetchOnlyFromLeader = true,
+        readOnlyCommitted = true,
+        fetchMaxBytes = Int.MaxValue,
+        hardMaxBytesLimit = false,
+        readPartitionInfo = fetchInfo,
+        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
     assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
-      replicaManager.readFromLocalLog(true, false, Int.MaxValue, false, fetchInfo, UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
+      replicaManager.readFromLocalLog(
+        replicaId = Request.OrdinaryConsumerId,
+        fetchOnlyFromLeader = true,
+        readOnlyCommitted = false,
+        fetchMaxBytes = Int.MaxValue,
+        hardMaxBytesLimit = false,
+        readPartitionInfo = fetchInfo,
+        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
 
     assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
     assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())