You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/05/25 23:32:42 UTC
[kafka] branch 1.0 updated: KAFKA-6937: In-sync replica delayed
during fetch if replica throttle is exceeded (#5074)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 274bb7a KAFKA-6937: In-sync replica delayed during fetch if replica throttle is exceeded (#5074)
274bb7a is described below
commit 274bb7af895ab5988b38794e0753a45a76f779ae
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Fri May 25 16:19:12 2018 -0700
KAFKA-6937: In-sync replica delayed during fetch if replica throttle is exceeded (#5074)
Reviewers: Ismael Juma <is...@juma.me.uk>, Dong Lin <li...@gmail.com>, Ben Stopford <be...@gmail.com>
---
.../src/main/scala/kafka/server/DelayedFetch.scala | 10 ++-----
core/src/main/scala/kafka/server/KafkaApis.scala | 2 ++
.../scala/kafka/server/ReplicaFetcherThread.scala | 3 ++
.../kafka/server/ReplicaManagerQuotasTest.scala | 35 +++++++++++++++++++++-
4 files changed, 42 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index e478053..49bea5a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -77,7 +77,6 @@ class DelayedFetch(delayMs: Long,
*/
override def tryComplete() : Boolean = {
var accumulatedSize = 0
- var accumulatedThrottledSize = 0
fetchMetadata.fetchPartitionStatus.foreach {
case (topicPartition, fetchStatus) =>
val fetchOffset = fetchStatus.startOffsetMetadata
@@ -110,9 +109,7 @@ class DelayedFetch(delayMs: Long,
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
- if (quota.isThrottled(topicPartition))
- accumulatedThrottledSize += bytesAvailable
- else
+ if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
accumulatedSize += bytesAvailable
}
}
@@ -131,9 +128,8 @@ class DelayedFetch(delayMs: Long,
}
// Case D
- if (accumulatedSize >= fetchMetadata.fetchMinBytes
- || ((accumulatedSize + accumulatedThrottledSize) >= fetchMetadata.fetchMinBytes && !quota.isQuotaExceeded()))
- forceComplete()
+ if (accumulatedSize >= fetchMetadata.fetchMinBytes)
+ forceComplete()
else
false
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ee004f5..82d773c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -611,6 +611,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
+ // traffic doesn't exceed quota.
private def sizeOfThrottledPartitions(versionId: Short,
fetchRequest: FetchRequest,
mergedPartitionData: Seq[(TopicPartition, FetchResponse.PartitionData)],
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 850e882..2fb0448 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -113,6 +113,9 @@ class ReplicaFetcherThread(name: String,
replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
if (logger.isTraceEnabled)
trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
+
+ // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
+ // traffic doesn't exceed quota.
if (quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index b27e7d9..2ba2777 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -147,6 +147,38 @@ class ReplicaManagerQuotasTest {
fetch.find(_._1 == topicPartition2).get._2.info.records.batches.asScala.size)
}
+ @Test
+ def testCompleteInDelayedFetchWithReplicaThrottling(): Unit = {
+ // Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
+ def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
+ val logOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+ val replica = EasyMock.createMock(classOf[Replica])
+ EasyMock.expect(replica.logEndOffset).andReturn(logOffsetMetadata).anyTimes()
+ EasyMock.replay(replica)
+
+ val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+ EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(EasyMock.anyObject[TopicPartition])).andReturn(replica).anyTimes()
+ EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[TopicPartition], EasyMock.anyObject[Int]))
+ .andReturn(!isReplicaInSync).anyTimes()
+ EasyMock.replay(replicaManager)
+
+ val tp = new TopicPartition("t1", 0)
+ val fetchParititonStatus = new FetchPartitionStatus(new LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
+ relativePositionInSegment = 250), new PartitionData(50, 0, 1))
+ val fetchMetadata = new FetchMetadata(fetchMinBytes = 1, fetchMaxBytes = 1000, hardMaxBytesLimit = true, fetchOnlyLeader = true,
+ fetchOnlyCommitted = false, isFromFollower = true, replicaId = 1, fetchPartitionStatus = List((tp, fetchParititonStatus)))
+ new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager,
+ quota = null, isolationLevel = IsolationLevel.READ_UNCOMMITTED, responseCallback = null) {
+ override def forceComplete(): Boolean = {
+ true
+ }
+ }
+ }
+
+ assertTrue("In sync replica should complete", setupDelayedFetch(isReplicaInSync = true).tryComplete())
+ assertFalse("Out of sync replica should not complete", setupDelayedFetch(isReplicaInSync = false).tryComplete())
+ }
+
def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
val zkUtils = createNiceMock(classOf[ZkUtils])
val scheduler = createNiceMock(classOf[KafkaScheduler])
@@ -207,7 +239,8 @@ class ReplicaManagerQuotasTest {
@After
def tearDown() {
- replicaManager.shutdown(false)
+ if (replicaManager != null)
+ replicaManager.shutdown(false)
metrics.close()
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.