You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/01 22:30:26 UTC

[kafka] branch trunk updated: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics (#8586)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 794648a  KAFKA-9939; Fix overcounting delayed fetches in request rate metrics (#8586)
794648a is described below

commit 794648aa554b5566268ff3eb650b14610a34091c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 1 15:29:49 2020 -0700

    KAFKA-9939; Fix overcounting delayed fetches in request rate metrics (#8586)
    
    Fetches which hit purgatory are currently counted twice in fetch request rate metrics. This patch moves the metric update into `fetchMessages` so that they are only counted once.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |   6 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  54 +++++-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  | 206 ---------------------
 3 files changed, 54 insertions(+), 212 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4d917d5..5bdc3d2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -973,6 +973,9 @@ class ReplicaManager(val config: KafkaConfig,
     val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
     var anyPartitionsNeedHwUpdate = false
     logReadResults.foreach { case (topicPartition, logReadResult) =>
+      brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
+      brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
+
       if (logReadResult.error != Errors.NONE)
         errorReadingData = true
       bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
@@ -1035,9 +1038,6 @@ class ReplicaManager(val config: KafkaConfig,
       val partitionFetchSize = fetchInfo.maxBytes
       val followerLogStartOffset = fetchInfo.logStartOffset
 
-      brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
-      brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
-
       val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
       try {
         if (traceEnabled)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index aa5e410..0ed04d5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -79,6 +79,7 @@ class ReplicaManagerTest {
 
   @After
   def tearDown(): Unit = {
+    TestUtils.clearYammerMetrics()
     metrics.close()
   }
 
@@ -519,7 +520,7 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testFetchBeyondHighWatermarkReturnEmptyResponse(): Unit = {
+  def testFetchBeyondHighWatermark(): Unit = {
     val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2))
     try {
       val brokerList = Seq[Integer](0, 1, 2).asJava
@@ -553,14 +554,16 @@ class ReplicaManagerTest {
         }
       }
 
-      // Fetch a message above the high watermark as a follower
+      // Followers are always allowed to fetch above the high watermark
       val followerFetchResult = fetchAsFollower(rm, new TopicPartition(topic, 0),
         new PartitionData(1, 0, 100000, Optional.empty()))
       val followerFetchData = followerFetchResult.assertFired
       assertEquals("Should not give an exception", Errors.NONE, followerFetchData.error)
       assertTrue("Should return some data", followerFetchData.records.batches.iterator.hasNext)
 
-      // Fetch a message above the high watermark as a consumer
+      // Consumers are not allowed to consume above the high watermark. However, since the
+      // high watermark could be stale at the time of the request, we do not return an out of
+      // range error and instead return an empty record set.
       val consumerFetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0),
         new PartitionData(1, 0, 100000, Optional.empty()))
       val consumerFetchData = consumerFetchResult.assertFired
@@ -1018,6 +1021,51 @@ class ReplicaManagerTest {
   }
 
   @Test
+  def testFetchRequestRateMetrics(): Unit = {
+    val mockTimer = new MockTimer
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
+
+    val tp0 = new TopicPartition(topic, 0)
+    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
+    val partition0Replicas = Seq[Integer](0, 1).asJava
+
+    val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+      Seq(new LeaderAndIsrPartitionState()
+        .setTopicName(tp0.topic)
+        .setPartitionIndex(tp0.partition)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(partition0Replicas)
+        .setZkVersion(0)
+        .setReplicas(partition0Replicas)
+        .setIsNew(true)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+
+    def assertMetricCount(expected: Int): Unit = {
+      assertEquals(expected, replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count)
+      assertEquals(expected, replicaManager.brokerTopicStats.topicStats(topic).totalFetchRequestRate.count)
+    }
+
+    val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+      Optional.empty())
+
+    val nonPurgatoryFetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 0)
+    assertNotNull(nonPurgatoryFetchResult.get)
+    assertEquals(Errors.NONE, nonPurgatoryFetchResult.get.error)
+    assertMetricCount(1)
+
+    val purgatoryFetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10)
+    assertNull(purgatoryFetchResult.get)
+    mockTimer.advanceClock(11)
+    assertNotNull(purgatoryFetchResult.get)
+    assertEquals(Errors.NONE, purgatoryFetchResult.get.error)
+    assertMetricCount(2)
+  }
+
+  @Test
   def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = {
     val mockTimer = new MockTimer
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
deleted file mode 100644
index b320546..0000000
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.io.File
-
-import kafka.api._
-import kafka.utils._
-import kafka.log.Log
-import kafka.log.LogManager
-import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.junit.{After, Before, Test}
-import java.util.{Optional, Properties}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.easymock.EasyMock
-import org.junit.Assert._
-
-class SimpleFetchTest {
-
-  val replicaLagTimeMaxMs = 100L
-  val replicaFetchWaitMaxMs = 100
-  val replicaLagMaxMessages = 10L
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-
-  val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
-
-  // set the replica manager with the partition
-  val time = new MockTime
-  val metrics = new Metrics
-  val leaderLEO = 20L
-  val followerLEO = 15L
-  val partitionHW = 5
-
-  val fetchSize = 100
-  val recordToHW = new SimpleRecord("recordToHW".getBytes())
-  val recordToLEO = new SimpleRecord("recordToLEO".getBytes())
-
-  val topic = "test-topic"
-  val partitionId = 0
-  val topicPartition = new TopicPartition(topic, partitionId)
-
-  val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize,
-    Optional.empty()))
-
-  var replicaManager: ReplicaManager = _
-
-  @Before
-  def setUp(): Unit = {
-    // create nice mock since we don't particularly care about zkclient calls
-    val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
-    EasyMock.replay(kafkaZkClient)
-
-    // create nice mock since we don't particularly care about scheduler calls
-    val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
-    EasyMock.replay(scheduler)
-
-    // create the log which takes read with either HW max offset or none max offset
-    val log: Log = EasyMock.createNiceMock(classOf[Log])
-    EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
-    EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
-    EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.maybeIncrementHighWatermark(EasyMock.anyObject[LogOffsetMetadata]))
-      .andReturn(Some(LogOffsetMetadata(partitionHW))).anyTimes()
-    EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchHighWatermark,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
-      )).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchLogEnd,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
-      )).anyTimes()
-    EasyMock.replay(log)
-
-    // create the log manager that is aware of this mock log
-    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
-    EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
-    EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
-    EasyMock.replay(logManager)
-
-    // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
-      new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
-
-    // add the partition with two replicas, both in ISR
-    val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
-
-    // create the leader replica with the local log
-    log.updateHighWatermark(partitionHW)
-    partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
-    partition.setLog(log, false)
-
-    // create the follower replica with defined log end offset
-    val followerId = configs(1).brokerId
-    val allReplicas = Seq(configs.head.brokerId, followerId)
-    partition.updateAssignmentAndIsr(
-      assignment = allReplicas,
-      isr = allReplicas.toSet,
-      addingReplicas = Seq.empty,
-      removingReplicas = Seq.empty
-    )
-    val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    partition.updateFollowerFetchState(
-      followerId,
-      followerFetchOffsetMetadata = leo,
-      followerStartOffset = 0L,
-      followerFetchTimeMs= time.milliseconds,
-      leaderEndOffset = leo.messageOffset,
-      partition.localLogOrException.highWatermark)
-  }
-
-  @After
-  def tearDown(): Unit = {
-    replicaManager.shutdown(false)
-    metrics.close()
-  }
-
-  /**
-   * The scenario for this test is that there is one topic that has one partition
-   * with one leader replica on broker "0" and one follower replica on broker "1"
-   * inside the replica manager's metadata.
-   *
-   * The leader replica on "0" has HW of "5" and LEO of "20".  The follower on
-   * broker "1" has a local replica with a HW matching the leader's ("5") and
-   * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR).
-   *
-   * When a fetch operation with read committed data turned on is received, the replica manager
-   * should only return data up to the HW of the partition; when a fetch operation with read
-   * committed data turned off is received, the replica manager could return data up to the LEO
-   * of the local leader replica's log.
-   *
-   * This test also verifies counts of fetch requests recorded by the ReplicaManager
-   */
-  @Test
-  def testReadFromLog(): Unit = {
-    val brokerTopicStats = new BrokerTopicStats
-    val initialTopicCount = brokerTopicStats.topicStats(topic).totalFetchRequestRate.count()
-    val initialAllTopicsCount = brokerTopicStats.allTopicsStats.totalFetchRequestRate.count()
-
-    val readCommittedRecords = replicaManager.readFromLocalLog(
-      replicaId = Request.OrdinaryConsumerId,
-      fetchOnlyFromLeader = true,
-      fetchIsolation = FetchHighWatermark,
-      fetchMaxBytes = Int.MaxValue,
-      hardMaxBytesLimit = false,
-      readPartitionInfo = fetchInfo,
-      quota = UnboundedQuota,
-      clientMetadata = None).find(_._1 == topicPartition)
-    val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next()
-    assertEquals("Reading committed data should return messages only up to high watermark", recordToHW,
-      new SimpleRecord(firstReadRecord))
-
-    val readAllRecords = replicaManager.readFromLocalLog(
-      replicaId = Request.OrdinaryConsumerId,
-      fetchOnlyFromLeader = true,
-      fetchIsolation = FetchLogEnd,
-      fetchMaxBytes = Int.MaxValue,
-      hardMaxBytesLimit = false,
-      readPartitionInfo = fetchInfo,
-      quota = UnboundedQuota,
-      clientMetadata = None).find(_._1 == topicPartition)
-
-    val firstRecord = readAllRecords.get._2.info.records.records.iterator.next()
-    assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,
-      new SimpleRecord(firstRecord))
-
-    assertEquals("Counts should increment after fetch", initialTopicCount+2, brokerTopicStats.topicStats(topic).totalFetchRequestRate.count())
-    assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, brokerTopicStats.allTopicsStats.totalFetchRequestRate.count())
-  }
-}