You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/05/25 23:22:44 UTC

[kafka] branch 1.1 updated: KAFKA-6937: In-sync replica delayed during fetch if replica throttle is exceeded (#5074)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 552addd  KAFKA-6937: In-sync replica delayed during fetch if replica throttle is exceeded (#5074)
552addd is described below

commit 552adddadd886921beecf1284fe8c5a110ddd8ce
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Fri May 25 16:19:12 2018 -0700

    KAFKA-6937: In-sync replica delayed during fetch if replica throttle is exceeded (#5074)
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Dong Lin <li...@gmail.com>, Ben Stopford <be...@gmail.com>
---
 .../src/main/scala/kafka/server/DelayedFetch.scala | 10 ++-----
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 ++
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  3 ++
 .../kafka/server/ReplicaManagerQuotasTest.scala    | 35 +++++++++++++++++++++-
 4 files changed, 42 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index e478053..49bea5a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -77,7 +77,6 @@ class DelayedFetch(delayMs: Long,
    */
   override def tryComplete() : Boolean = {
     var accumulatedSize = 0
-    var accumulatedThrottledSize = 0
     fetchMetadata.fetchPartitionStatus.foreach {
       case (topicPartition, fetchStatus) =>
         val fetchOffset = fetchStatus.startOffsetMetadata
@@ -110,9 +109,7 @@ class DelayedFetch(delayMs: Long,
               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                 // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
                 val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
-                if (quota.isThrottled(topicPartition))
-                  accumulatedThrottledSize += bytesAvailable
-                else
+                if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
                   accumulatedSize += bytesAvailable
               }
             }
@@ -131,9 +128,8 @@ class DelayedFetch(delayMs: Long,
     }
 
     // Case D
-    if (accumulatedSize >= fetchMetadata.fetchMinBytes
-      || ((accumulatedSize + accumulatedThrottledSize) >= fetchMetadata.fetchMinBytes && !quota.isQuotaExceeded()))
-      forceComplete()
+    if (accumulatedSize >= fetchMetadata.fetchMinBytes)
+       forceComplete()
     else
       false
   }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5d68c98..62b3ba3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -659,6 +659,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     override def remove() = throw new UnsupportedOperationException()
   }
 
+  // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
+  // traffic doesn't exceed quota.
   private def sizeOfThrottledPartitions(versionId: Short,
                                         unconvertedResponse: FetchResponse,
                                         quota: ReplicationQuotaManager): Int = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8344d5b..fa45e7f 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -123,6 +123,9 @@ class ReplicaFetcherThread(name: String,
     replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
     if (isTraceEnabled)
       trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
+
+    // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
+    // traffic doesn't exceed quota.
     if (quota.isThrottled(topicPartition))
       quota.record(records.sizeInBytes)
     replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 334321a..c6efca5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -148,6 +148,38 @@ class ReplicaManagerQuotasTest {
       fetch.find(_._1 == topicPartition2).get._2.info.records.batches.asScala.size)
   }
 
+  @Test
+  def testCompleteInDelayedFetchWithReplicaThrottling(): Unit = {
+    // Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
+    def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
+      val logOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+      val replica = EasyMock.createMock(classOf[Replica])
+      EasyMock.expect(replica.logEndOffset).andReturn(logOffsetMetadata).anyTimes()
+      EasyMock.replay(replica)
+
+      val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+      EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(EasyMock.anyObject[TopicPartition])).andReturn(replica).anyTimes()
+      EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[TopicPartition], EasyMock.anyObject[Int]))
+        .andReturn(!isReplicaInSync).anyTimes()
+      EasyMock.replay(replicaManager)
+
+      val tp = new TopicPartition("t1", 0)
+      val fetchParititonStatus = new FetchPartitionStatus(new LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
+        relativePositionInSegment = 250), new PartitionData(50, 0, 1))
+      val fetchMetadata = new FetchMetadata(fetchMinBytes = 1, fetchMaxBytes = 1000, hardMaxBytesLimit = true, fetchOnlyLeader = true,
+        fetchOnlyCommitted = false, isFromFollower = true, replicaId = 1, fetchPartitionStatus = List((tp, fetchParititonStatus)))
+      new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager,
+        quota = null, isolationLevel = IsolationLevel.READ_UNCOMMITTED, responseCallback = null) {
+        override def forceComplete(): Boolean = {
+          true
+        }
+      }
+    }
+
+    assertTrue("In sync replica should complete", setupDelayedFetch(isReplicaInSync = true).tryComplete())
+    assertFalse("Out of sync replica should not complete", setupDelayedFetch(isReplicaInSync = false).tryComplete())
+  }
+
   def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
     val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
@@ -208,7 +240,8 @@ class ReplicaManagerQuotasTest {
 
   @After
   def tearDown() {
-    replicaManager.shutdown(false)
+    if (replicaManager != null)
+      replicaManager.shutdown(false)
     metrics.close()
   }
 

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.