You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/01 22:30:26 UTC
[kafka] branch trunk updated: KAFKA-9939;
Fix overcounting delayed fetches in request rate metrics (#8586)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 794648a KAFKA-9939; Fix overcounting delayed fetches in request rate metrics (#8586)
794648a is described below
commit 794648aa554b5566268ff3eb650b14610a34091c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 1 15:29:49 2020 -0700
KAFKA-9939; Fix overcounting delayed fetches in request rate metrics (#8586)
Fetches which hit purgatory are currently counted twice in fetch request rate metrics. This patch moves the metric update into `fetchMessages` so that they are only counted once.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../main/scala/kafka/server/ReplicaManager.scala | 6 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 54 +++++-
.../scala/unit/kafka/server/SimpleFetchTest.scala | 206 ---------------------
3 files changed, 54 insertions(+), 212 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4d917d5..5bdc3d2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -973,6 +973,9 @@ class ReplicaManager(val config: KafkaConfig,
val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
var anyPartitionsNeedHwUpdate = false
logReadResults.foreach { case (topicPartition, logReadResult) =>
+ brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
+ brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
+
if (logReadResult.error != Errors.NONE)
errorReadingData = true
bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
@@ -1035,9 +1038,6 @@ class ReplicaManager(val config: KafkaConfig,
val partitionFetchSize = fetchInfo.maxBytes
val followerLogStartOffset = fetchInfo.logStartOffset
- brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
- brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
-
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
try {
if (traceEnabled)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index aa5e410..0ed04d5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -79,6 +79,7 @@ class ReplicaManagerTest {
@After
def tearDown(): Unit = {
+ TestUtils.clearYammerMetrics()
metrics.close()
}
@@ -519,7 +520,7 @@ class ReplicaManagerTest {
}
@Test
- def testFetchBeyondHighWatermarkReturnEmptyResponse(): Unit = {
+ def testFetchBeyondHighWatermark(): Unit = {
val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2))
try {
val brokerList = Seq[Integer](0, 1, 2).asJava
@@ -553,14 +554,16 @@ class ReplicaManagerTest {
}
}
- // Fetch a message above the high watermark as a follower
+ // Followers are always allowed to fetch above the high watermark
val followerFetchResult = fetchAsFollower(rm, new TopicPartition(topic, 0),
new PartitionData(1, 0, 100000, Optional.empty()))
val followerFetchData = followerFetchResult.assertFired
assertEquals("Should not give an exception", Errors.NONE, followerFetchData.error)
assertTrue("Should return some data", followerFetchData.records.batches.iterator.hasNext)
- // Fetch a message above the high watermark as a consumer
+ // Consumers are not allowed to consume above the high watermark. However, since the
+ // high watermark could be stale at the time of the request, we do not return an out of
+ // range error and instead return an empty record set.
val consumerFetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0),
new PartitionData(1, 0, 100000, Optional.empty()))
val consumerFetchData = consumerFetchResult.assertFired
@@ -1018,6 +1021,51 @@ class ReplicaManagerTest {
}
@Test
+ def testFetchRequestRateMetrics(): Unit = {
+ val mockTimer = new MockTimer
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
+
+ val tp0 = new TopicPartition(topic, 0)
+ val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
+ val partition0Replicas = Seq[Integer](0, 1).asJava
+
+ val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(tp0.topic)
+ .setPartitionIndex(tp0.partition)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(1)
+ .setIsr(partition0Replicas)
+ .setZkVersion(0)
+ .setReplicas(partition0Replicas)
+ .setIsNew(true)).asJava,
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+ replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+
+ def assertMetricCount(expected: Int): Unit = {
+ assertEquals(expected, replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count)
+ assertEquals(expected, replicaManager.brokerTopicStats.topicStats(topic).totalFetchRequestRate.count)
+ }
+
+ val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+ Optional.empty())
+
+ val nonPurgatoryFetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 0)
+ assertNotNull(nonPurgatoryFetchResult.get)
+ assertEquals(Errors.NONE, nonPurgatoryFetchResult.get.error)
+ assertMetricCount(1)
+
+ val purgatoryFetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10)
+ assertNull(purgatoryFetchResult.get)
+ mockTimer.advanceClock(11)
+ assertNotNull(purgatoryFetchResult.get)
+ assertEquals(Errors.NONE, purgatoryFetchResult.get.error)
+ assertMetricCount(2)
+ }
+
+ @Test
def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = {
val mockTimer = new MockTimer
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
deleted file mode 100644
index b320546..0000000
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.io.File
-
-import kafka.api._
-import kafka.utils._
-import kafka.log.Log
-import kafka.log.LogManager
-import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.junit.{After, Before, Test}
-import java.util.{Optional, Properties}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.easymock.EasyMock
-import org.junit.Assert._
-
-class SimpleFetchTest {
-
- val replicaLagTimeMaxMs = 100L
- val replicaFetchWaitMaxMs = 100
- val replicaLagMaxMessages = 10L
-
- val overridingProps = new Properties()
- overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
- overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-
- val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
-
- // set the replica manager with the partition
- val time = new MockTime
- val metrics = new Metrics
- val leaderLEO = 20L
- val followerLEO = 15L
- val partitionHW = 5
-
- val fetchSize = 100
- val recordToHW = new SimpleRecord("recordToHW".getBytes())
- val recordToLEO = new SimpleRecord("recordToLEO".getBytes())
-
- val topic = "test-topic"
- val partitionId = 0
- val topicPartition = new TopicPartition(topic, partitionId)
-
- val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize,
- Optional.empty()))
-
- var replicaManager: ReplicaManager = _
-
- @Before
- def setUp(): Unit = {
- // create nice mock since we don't particularly care about zkclient calls
- val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
- EasyMock.replay(kafkaZkClient)
-
- // create nice mock since we don't particularly care about scheduler calls
- val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
- EasyMock.replay(scheduler)
-
- // create the log which takes read with either HW max offset or none max offset
- val log: Log = EasyMock.createNiceMock(classOf[Log])
- EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
- EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
- EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
- EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
- EasyMock.expect(log.maybeIncrementHighWatermark(EasyMock.anyObject[LogOffsetMetadata]))
- .andReturn(Some(LogOffsetMetadata(partitionHW))).anyTimes()
- EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
- EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
- EasyMock.expect(log.read(
- startOffset = 0,
- maxLength = fetchSize,
- isolation = FetchHighWatermark,
- minOneMessage = true))
- .andReturn(FetchDataInfo(
- LogOffsetMetadata(0L, 0L, 0),
- MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
- )).anyTimes()
- EasyMock.expect(log.read(
- startOffset = 0,
- maxLength = fetchSize,
- isolation = FetchLogEnd,
- minOneMessage = true))
- .andReturn(FetchDataInfo(
- LogOffsetMetadata(0L, 0L, 0),
- MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
- )).anyTimes()
- EasyMock.replay(log)
-
- // create the log manager that is aware of this mock log
- val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
- EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
- EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
- EasyMock.replay(logManager)
-
- // create the replica manager
- replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
- new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
- new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
-
- // add the partition with two replicas, both in ISR
- val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
-
- // create the leader replica with the local log
- log.updateHighWatermark(partitionHW)
- partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
- partition.setLog(log, false)
-
- // create the follower replica with defined log end offset
- val followerId = configs(1).brokerId
- val allReplicas = Seq(configs.head.brokerId, followerId)
- partition.updateAssignmentAndIsr(
- assignment = allReplicas,
- isr = allReplicas.toSet,
- addingReplicas = Seq.empty,
- removingReplicas = Seq.empty
- )
- val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
- partition.updateFollowerFetchState(
- followerId,
- followerFetchOffsetMetadata = leo,
- followerStartOffset = 0L,
- followerFetchTimeMs= time.milliseconds,
- leaderEndOffset = leo.messageOffset,
- partition.localLogOrException.highWatermark)
- }
-
- @After
- def tearDown(): Unit = {
- replicaManager.shutdown(false)
- metrics.close()
- }
-
- /**
- * The scenario for this test is that there is one topic that has one partition
- * with one leader replica on broker "0" and one follower replica on broker "1"
- * inside the replica manager's metadata.
- *
- * The leader replica on "0" has HW of "5" and LEO of "20". The follower on
- * broker "1" has a local replica with a HW matching the leader's ("5") and
- * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR).
- *
- * When a fetch operation with read committed data turned on is received, the replica manager
- * should only return data up to the HW of the partition; when a fetch operation with read
- * committed data turned off is received, the replica manager could return data up to the LEO
- * of the local leader replica's log.
- *
- * This test also verifies counts of fetch requests recorded by the ReplicaManager
- */
- @Test
- def testReadFromLog(): Unit = {
- val brokerTopicStats = new BrokerTopicStats
- val initialTopicCount = brokerTopicStats.topicStats(topic).totalFetchRequestRate.count()
- val initialAllTopicsCount = brokerTopicStats.allTopicsStats.totalFetchRequestRate.count()
-
- val readCommittedRecords = replicaManager.readFromLocalLog(
- replicaId = Request.OrdinaryConsumerId,
- fetchOnlyFromLeader = true,
- fetchIsolation = FetchHighWatermark,
- fetchMaxBytes = Int.MaxValue,
- hardMaxBytesLimit = false,
- readPartitionInfo = fetchInfo,
- quota = UnboundedQuota,
- clientMetadata = None).find(_._1 == topicPartition)
- val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next()
- assertEquals("Reading committed data should return messages only up to high watermark", recordToHW,
- new SimpleRecord(firstReadRecord))
-
- val readAllRecords = replicaManager.readFromLocalLog(
- replicaId = Request.OrdinaryConsumerId,
- fetchOnlyFromLeader = true,
- fetchIsolation = FetchLogEnd,
- fetchMaxBytes = Int.MaxValue,
- hardMaxBytesLimit = false,
- readPartitionInfo = fetchInfo,
- quota = UnboundedQuota,
- clientMetadata = None).find(_._1 == topicPartition)
-
- val firstRecord = readAllRecords.get._2.info.records.records.iterator.next()
- assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,
- new SimpleRecord(firstRecord))
-
- assertEquals("Counts should increment after fetch", initialTopicCount+2, brokerTopicStats.topicStats(topic).totalFetchRequestRate.count())
- assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, brokerTopicStats.allTopicsStats.totalFetchRequestRate.count())
- }
-}