You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/29 20:15:08 UTC

[GitHub] [kafka] hachikuji opened a new pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

hachikuji opened a new pull request #8586:
URL: https://github.com/apache/kafka/pull/8586


   Fetches which hit purgatory are currently counted twice in fetch request rate metrics. This patch moves the metric update into `fetchMessages` so that they are only counted once. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] edoardocomar commented on pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
edoardocomar commented on pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#issuecomment-621942562


   Hi @hachikuji as you're working on this ... do you mind taking a look at https://github.com/apache/kafka/pull/4204 ?
   
   the test we have added there still fails with this change of yours, without the workarounds we had suggested. 
   Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#discussion_r418414804



##########
File path: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
##########
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.io.File
-
-import kafka.api._
-import kafka.utils._
-import kafka.log.Log
-import kafka.log.LogManager
-import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.junit.{After, Before, Test}
-import java.util.{Optional, Properties}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.easymock.EasyMock
-import org.junit.Assert._
-
-class SimpleFetchTest {
-
-  val replicaLagTimeMaxMs = 100L
-  val replicaFetchWaitMaxMs = 100
-  val replicaLagMaxMessages = 10L
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-
-  val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
-
-  // set the replica manager with the partition
-  val time = new MockTime
-  val metrics = new Metrics
-  val leaderLEO = 20L
-  val followerLEO = 15L
-  val partitionHW = 5
-
-  val fetchSize = 100
-  val recordToHW = new SimpleRecord("recordToHW".getBytes())
-  val recordToLEO = new SimpleRecord("recordToLEO".getBytes())
-
-  val topic = "test-topic"
-  val partitionId = 0
-  val topicPartition = new TopicPartition(topic, partitionId)
-
-  val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize,
-    Optional.empty()))
-
-  var replicaManager: ReplicaManager = _
-
-  @Before
-  def setUp(): Unit = {
-    // create nice mock since we don't particularly care about zkclient calls
-    val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
-    EasyMock.replay(kafkaZkClient)
-
-    // create nice mock since we don't particularly care about scheduler calls
-    val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
-    EasyMock.replay(scheduler)
-
-    // create the log which takes read with either HW max offset or none max offset
-    val log: Log = EasyMock.createNiceMock(classOf[Log])
-    EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
-    EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
-    EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.maybeIncrementHighWatermark(EasyMock.anyObject[LogOffsetMetadata]))
-      .andReturn(Some(LogOffsetMetadata(partitionHW))).anyTimes()
-    EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchHighWatermark,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
-      )).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchLogEnd,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
-      )).anyTimes()
-    EasyMock.replay(log)
-
-    // create the log manager that is aware of this mock log
-    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
-    EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
-    EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
-    EasyMock.replay(logManager)
-
-    // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
-      new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
-
-    // add the partition with two replicas, both in ISR
-    val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
-
-    // create the leader replica with the local log
-    log.updateHighWatermark(partitionHW)
-    partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
-    partition.setLog(log, false)
-
-    // create the follower replica with defined log end offset
-    val followerId = configs(1).brokerId
-    val allReplicas = Seq(configs.head.brokerId, followerId)
-    partition.updateAssignmentAndIsr(
-      assignment = allReplicas,
-      isr = allReplicas.toSet,
-      addingReplicas = Seq.empty,
-      removingReplicas = Seq.empty
-    )
-    val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    partition.updateFollowerFetchState(
-      followerId,
-      followerFetchOffsetMetadata = leo,
-      followerStartOffset = 0L,
-      followerFetchTimeMs= time.milliseconds,
-      leaderEndOffset = leo.messageOffset,
-      partition.localLogOrException.highWatermark)
-  }
-
-  @After
-  def tearDown(): Unit = {
-    replicaManager.shutdown(false)
-    metrics.close()
-  }
-
-  /**
-   * The scenario for this test is that there is one topic that has one partition
-   * with one leader replica on broker "0" and one follower replica on broker "1"
-   * inside the replica manager's metadata.
-   *
-   * The leader replica on "0" has HW of "5" and LEO of "20".  The follower on
-   * broker "1" has a local replica with a HW matching the leader's ("5") and
-   * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR).
-   *
-   * When a fetch operation with read committed data turned on is received, the replica manager
-   * should only return data up to the HW of the partition; when a fetch operation with read
-   * committed data turned off is received, the replica manager could return data up to the LEO
-   * of the local leader replica's log.
-   *
-   * This test also verifies counts of fetch requests recorded by the ReplicaManager
-   */
-  @Test
-  def testReadFromLog(): Unit = {

Review comment:
       That test name is a bit misleading. Should we name it `testFetchBeyondHighwatermarkForConsumerAndFollower` or something?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1017,6 +1018,51 @@ class ReplicaManagerTest {
     assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
   }
 
+  @Test
+  def testFetchRequestRateMetrics(): Unit = {
+    val mockTimer = new MockTimer
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
+
+    val tp0 = new TopicPartition(topic, 0)
+    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
+    val partition0Replicas = Seq[Integer](0, 1).asJava
+
+    val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+      Seq(new LeaderAndIsrPartitionState()
+        .setTopicName(tp0.topic)
+        .setPartitionIndex(tp0.partition)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(partition0Replicas)
+        .setZkVersion(0)
+        .setReplicas(partition0Replicas)
+        .setIsNew(true)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+
+    def assertMetricCount(expected: Int): Unit = {
+      assertEquals(expected, replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count)
+      assertEquals(expected, replicaManager.brokerTopicStats.topicStats(topic).totalFetchRequestRate.count)
+    }
+
+    val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+      Optional.empty())
+
+    val nonPurgatoryFetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 0)
+    assertNotNull(nonPurgatoryFetchResult.get)
+    assertEquals(Errors.NONE, nonPurgatoryFetchResult.get.error)
+    assertMetricCount(1)
+
+    val purgatoryFetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10)
+    assertNull(purgatoryFetchResult.get)
+    mockTimer.advanceClock(11)
+    assertNotNull(purgatoryFetchResult.get)
+    assertEquals(Errors.NONE, purgatoryFetchResult.get.error)
+    assertMetricCount(2)

Review comment:
       This would return `3` without the fix?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] edoardocomar commented on pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
edoardocomar commented on pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#issuecomment-621943779


   cc @mimaison 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#issuecomment-621490434


   The failure in the test case is a problem of metric isolation. I will submit a fix shortly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#discussion_r418679953



##########
File path: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
##########
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.io.File
-
-import kafka.api._
-import kafka.utils._
-import kafka.log.Log
-import kafka.log.LogManager
-import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.junit.{After, Before, Test}
-import java.util.{Optional, Properties}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.easymock.EasyMock
-import org.junit.Assert._
-
-class SimpleFetchTest {
-
-  val replicaLagTimeMaxMs = 100L
-  val replicaFetchWaitMaxMs = 100
-  val replicaLagMaxMessages = 10L
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-
-  val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
-
-  // set the replica manager with the partition
-  val time = new MockTime
-  val metrics = new Metrics
-  val leaderLEO = 20L
-  val followerLEO = 15L
-  val partitionHW = 5
-
-  val fetchSize = 100
-  val recordToHW = new SimpleRecord("recordToHW".getBytes())
-  val recordToLEO = new SimpleRecord("recordToLEO".getBytes())
-
-  val topic = "test-topic"
-  val partitionId = 0
-  val topicPartition = new TopicPartition(topic, partitionId)
-
-  val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize,
-    Optional.empty()))
-
-  var replicaManager: ReplicaManager = _
-
-  @Before
-  def setUp(): Unit = {
-    // create nice mock since we don't particularly care about zkclient calls
-    val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
-    EasyMock.replay(kafkaZkClient)
-
-    // create nice mock since we don't particularly care about scheduler calls
-    val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
-    EasyMock.replay(scheduler)
-
-    // create the log which takes read with either HW max offset or none max offset
-    val log: Log = EasyMock.createNiceMock(classOf[Log])
-    EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
-    EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
-    EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.maybeIncrementHighWatermark(EasyMock.anyObject[LogOffsetMetadata]))
-      .andReturn(Some(LogOffsetMetadata(partitionHW))).anyTimes()
-    EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchHighWatermark,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
-      )).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchLogEnd,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
-      )).anyTimes()
-    EasyMock.replay(log)
-
-    // create the log manager that is aware of this mock log
-    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
-    EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
-    EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
-    EasyMock.replay(logManager)
-
-    // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
-      new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
-
-    // add the partition with two replicas, both in ISR
-    val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
-
-    // create the leader replica with the local log
-    log.updateHighWatermark(partitionHW)
-    partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
-    partition.setLog(log, false)
-
-    // create the follower replica with defined log end offset
-    val followerId = configs(1).brokerId
-    val allReplicas = Seq(configs.head.brokerId, followerId)
-    partition.updateAssignmentAndIsr(
-      assignment = allReplicas,
-      isr = allReplicas.toSet,
-      addingReplicas = Seq.empty,
-      removingReplicas = Seq.empty
-    )
-    val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    partition.updateFollowerFetchState(
-      followerId,
-      followerFetchOffsetMetadata = leo,
-      followerStartOffset = 0L,
-      followerFetchTimeMs= time.milliseconds,
-      leaderEndOffset = leo.messageOffset,
-      partition.localLogOrException.highWatermark)
-  }
-
-  @After
-  def tearDown(): Unit = {
-    replicaManager.shutdown(false)
-    metrics.close()
-  }
-
-  /**
-   * The scenario for this test is that there is one topic that has one partition
-   * with one leader replica on broker "0" and one follower replica on broker "1"
-   * inside the replica manager's metadata.
-   *
-   * The leader replica on "0" has HW of "5" and LEO of "20".  The follower on
-   * broker "1" has a local replica with a HW matching the leader's ("5") and
-   * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR).
-   *
-   * When a fetch operation with read committed data turned on is received, the replica manager
-   * should only return data up to the HW of the partition; when a fetch operation with read
-   * committed data turned off is received, the replica manager could return data up to the LEO
-   * of the local leader replica's log.
-   *
-   * This test also verifies counts of fetch requests recorded by the ReplicaManager
-   */
-  @Test
-  def testReadFromLog(): Unit = {

Review comment:
       That's fine. Is it worth adding a brief comment stating what it's testing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji edited a comment on pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji edited a comment on pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#issuecomment-621490434


   The failure in the test case is a problem with the isolation of metrics between test cases. I will submit a fix shortly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#issuecomment-622593285


   The 32 test failures on jdk11 were due to threads being left behind after a failure in `TransactionsBounceTest`. I will submit a separate fix for this. The other failure seems to be one of the recently flaky streams tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji edited a comment on pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji edited a comment on pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#issuecomment-621490434


   The failure in the test case is a problem of test isolation for metrics. I will submit a fix shortly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#discussion_r418718645



##########
File path: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
##########
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.io.File
-
-import kafka.api._
-import kafka.utils._
-import kafka.log.Log
-import kafka.log.LogManager
-import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.junit.{After, Before, Test}
-import java.util.{Optional, Properties}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.easymock.EasyMock
-import org.junit.Assert._
-
-class SimpleFetchTest {
-
-  val replicaLagTimeMaxMs = 100L
-  val replicaFetchWaitMaxMs = 100
-  val replicaLagMaxMessages = 10L
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-
-  val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
-
-  // set the replica manager with the partition
-  val time = new MockTime
-  val metrics = new Metrics
-  val leaderLEO = 20L
-  val followerLEO = 15L
-  val partitionHW = 5
-
-  val fetchSize = 100
-  val recordToHW = new SimpleRecord("recordToHW".getBytes())
-  val recordToLEO = new SimpleRecord("recordToLEO".getBytes())
-
-  val topic = "test-topic"
-  val partitionId = 0
-  val topicPartition = new TopicPartition(topic, partitionId)
-
-  val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize,
-    Optional.empty()))
-
-  var replicaManager: ReplicaManager = _
-
-  @Before
-  def setUp(): Unit = {
-    // create nice mock since we don't particularly care about zkclient calls
-    val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
-    EasyMock.replay(kafkaZkClient)
-
-    // create nice mock since we don't particularly care about scheduler calls
-    val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
-    EasyMock.replay(scheduler)
-
-    // create the log which takes read with either HW max offset or none max offset
-    val log: Log = EasyMock.createNiceMock(classOf[Log])
-    EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
-    EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
-    EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.maybeIncrementHighWatermark(EasyMock.anyObject[LogOffsetMetadata]))
-      .andReturn(Some(LogOffsetMetadata(partitionHW))).anyTimes()
-    EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchHighWatermark,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
-      )).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchLogEnd,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
-      )).anyTimes()
-    EasyMock.replay(log)
-
-    // create the log manager that is aware of this mock log
-    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
-    EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
-    EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
-    EasyMock.replay(logManager)
-
-    // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
-      new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
-
-    // add the partition with two replicas, both in ISR
-    val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
-
-    // create the leader replica with the local log
-    log.updateHighWatermark(partitionHW)
-    partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
-    partition.setLog(log, false)
-
-    // create the follower replica with defined log end offset
-    val followerId = configs(1).brokerId
-    val allReplicas = Seq(configs.head.brokerId, followerId)
-    partition.updateAssignmentAndIsr(
-      assignment = allReplicas,
-      isr = allReplicas.toSet,
-      addingReplicas = Seq.empty,
-      removingReplicas = Seq.empty
-    )
-    val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    partition.updateFollowerFetchState(
-      followerId,
-      followerFetchOffsetMetadata = leo,
-      followerStartOffset = 0L,
-      followerFetchTimeMs= time.milliseconds,
-      leaderEndOffset = leo.messageOffset,
-      partition.localLogOrException.highWatermark)
-  }
-
-  @After
-  def tearDown(): Unit = {
-    replicaManager.shutdown(false)
-    metrics.close()
-  }
-
-  /**
-   * The scenario for this test is that there is one topic that has one partition
-   * with one leader replica on broker "0" and one follower replica on broker "1"
-   * inside the replica manager's metadata.
-   *
-   * The leader replica on "0" has HW of "5" and LEO of "20".  The follower on
-   * broker "1" has a local replica with a HW matching the leader's ("5") and
-   * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR).
-   *
-   * When a fetch operation with read committed data turned on is received, the replica manager
-   * should only return data up to the HW of the partition; when a fetch operation with read
-   * committed data turned off is received, the replica manager could return data up to the LEO
-   * of the local leader replica's log.
-   *
-   * This test also verifies counts of fetch requests recorded by the ReplicaManager
-   */
-  @Test
-  def testReadFromLog(): Unit = {

Review comment:
       Borderline overkill I guess, but I added a few comments explaining the test behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#discussion_r418653376



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1017,6 +1018,51 @@ class ReplicaManagerTest {
     assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
   }
 
+  @Test
+  def testFetchRequestRateMetrics(): Unit = {
+    val mockTimer = new MockTimer
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
+
+    val tp0 = new TopicPartition(topic, 0)
+    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
+    val partition0Replicas = Seq[Integer](0, 1).asJava
+
+    val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+      Seq(new LeaderAndIsrPartitionState()
+        .setTopicName(tp0.topic)
+        .setPartitionIndex(tp0.partition)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(partition0Replicas)
+        .setZkVersion(0)
+        .setReplicas(partition0Replicas)
+        .setIsNew(true)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+
+    def assertMetricCount(expected: Int): Unit = {
+      assertEquals(expected, replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count)
+      assertEquals(expected, replicaManager.brokerTopicStats.topicStats(topic).totalFetchRequestRate.count)
+    }
+
+    val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+      Optional.empty())
+
+    val nonPurgatoryFetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 0)
+    assertNotNull(nonPurgatoryFetchResult.get)
+    assertEquals(Errors.NONE, nonPurgatoryFetchResult.get.error)
+    assertMetricCount(1)
+
+    val purgatoryFetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10)
+    assertNull(purgatoryFetchResult.get)
+    mockTimer.advanceClock(11)
+    assertNotNull(purgatoryFetchResult.get)
+    assertEquals(Errors.NONE, purgatoryFetchResult.get.error)
+    assertMetricCount(2)

Review comment:
       Right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#issuecomment-622487766


   @edoardocomar Thanks for the comment. I agree it is related. Left a comment on #4204.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#discussion_r418655368



##########
File path: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
##########
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.io.File
-
-import kafka.api._
-import kafka.utils._
-import kafka.log.Log
-import kafka.log.LogManager
-import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.junit.{After, Before, Test}
-import java.util.{Optional, Properties}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.easymock.EasyMock
-import org.junit.Assert._
-
-class SimpleFetchTest {
-
-  val replicaLagTimeMaxMs = 100L
-  val replicaFetchWaitMaxMs = 100
-  val replicaLagMaxMessages = 10L
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-
-  val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
-
-  // set the replica manager with the partition
-  val time = new MockTime
-  val metrics = new Metrics
-  val leaderLEO = 20L
-  val followerLEO = 15L
-  val partitionHW = 5
-
-  val fetchSize = 100
-  val recordToHW = new SimpleRecord("recordToHW".getBytes())
-  val recordToLEO = new SimpleRecord("recordToLEO".getBytes())
-
-  val topic = "test-topic"
-  val partitionId = 0
-  val topicPartition = new TopicPartition(topic, partitionId)
-
-  val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize,
-    Optional.empty()))
-
-  var replicaManager: ReplicaManager = _
-
-  @Before
-  def setUp(): Unit = {
-    // create nice mock since we don't particularly care about zkclient calls
-    val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
-    EasyMock.replay(kafkaZkClient)
-
-    // create nice mock since we don't particularly care about scheduler calls
-    val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
-    EasyMock.replay(scheduler)
-
-    // create the log which takes read with either HW max offset or none max offset
-    val log: Log = EasyMock.createNiceMock(classOf[Log])
-    EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
-    EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
-    EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.maybeIncrementHighWatermark(EasyMock.anyObject[LogOffsetMetadata]))
-      .andReturn(Some(LogOffsetMetadata(partitionHW))).anyTimes()
-    EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchHighWatermark,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
-      )).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchLogEnd,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
-      )).anyTimes()
-    EasyMock.replay(log)
-
-    // create the log manager that is aware of this mock log
-    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
-    EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
-    EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
-    EasyMock.replay(logManager)
-
-    // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
-      new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
-
-    // add the partition with two replicas, both in ISR
-    val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
-
-    // create the leader replica with the local log
-    log.updateHighWatermark(partitionHW)
-    partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
-    partition.setLog(log, false)
-
-    // create the follower replica with defined log end offset
-    val followerId = configs(1).brokerId
-    val allReplicas = Seq(configs.head.brokerId, followerId)
-    partition.updateAssignmentAndIsr(
-      assignment = allReplicas,
-      isr = allReplicas.toSet,
-      addingReplicas = Seq.empty,
-      removingReplicas = Seq.empty
-    )
-    val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    partition.updateFollowerFetchState(
-      followerId,
-      followerFetchOffsetMetadata = leo,
-      followerStartOffset = 0L,
-      followerFetchTimeMs= time.milliseconds,
-      leaderEndOffset = leo.messageOffset,
-      partition.localLogOrException.highWatermark)
-  }
-
-  @After
-  def tearDown(): Unit = {
-    replicaManager.shutdown(false)
-    metrics.close()
-  }
-
-  /**
-   * The scenario for this test is that there is one topic that has one partition
-   * with one leader replica on broker "0" and one follower replica on broker "1"
-   * inside the replica manager's metadata.
-   *
-   * The leader replica on "0" has HW of "5" and LEO of "20".  The follower on
-   * broker "1" has a local replica with a HW matching the leader's ("5") and
-   * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR).
-   *
-   * When a fetch operation with read committed data turned on is received, the replica manager
-   * should only return data up to the HW of the partition; when a fetch operation with read
-   * committed data turned off is received, the replica manager could return data up to the LEO
-   * of the local leader replica's log.
-   *
-   * This test also verifies counts of fetch requests recorded by the ReplicaManager
-   */
-  @Test
-  def testReadFromLog(): Unit = {

Review comment:
       Yeah, that's fair. How about just `testFetchBeyondHighwatermark`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji edited a comment on pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji edited a comment on pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#issuecomment-621490434


   The failure in the test case is a problem in the isolation of metrics between test cases. I will submit a fix shortly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8586: KAFKA-9939; Fix overcounting delayed fetches in request rate metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8586:
URL: https://github.com/apache/kafka/pull/8586#discussion_r417688549



##########
File path: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
##########
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.io.File
-
-import kafka.api._
-import kafka.utils._
-import kafka.log.Log
-import kafka.log.LogManager
-import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.junit.{After, Before, Test}
-import java.util.{Optional, Properties}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.easymock.EasyMock
-import org.junit.Assert._
-
-class SimpleFetchTest {
-
-  val replicaLagTimeMaxMs = 100L
-  val replicaFetchWaitMaxMs = 100
-  val replicaLagMaxMessages = 10L
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-
-  val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
-
-  // set the replica manager with the partition
-  val time = new MockTime
-  val metrics = new Metrics
-  val leaderLEO = 20L
-  val followerLEO = 15L
-  val partitionHW = 5
-
-  val fetchSize = 100
-  val recordToHW = new SimpleRecord("recordToHW".getBytes())
-  val recordToLEO = new SimpleRecord("recordToLEO".getBytes())
-
-  val topic = "test-topic"
-  val partitionId = 0
-  val topicPartition = new TopicPartition(topic, partitionId)
-
-  val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize,
-    Optional.empty()))
-
-  var replicaManager: ReplicaManager = _
-
-  @Before
-  def setUp(): Unit = {
-    // create nice mock since we don't particularly care about zkclient calls
-    val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
-    EasyMock.replay(kafkaZkClient)
-
-    // create nice mock since we don't particularly care about scheduler calls
-    val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
-    EasyMock.replay(scheduler)
-
-    // create the log which takes read with either HW max offset or none max offset
-    val log: Log = EasyMock.createNiceMock(classOf[Log])
-    EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
-    EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
-    EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
-    EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.maybeIncrementHighWatermark(EasyMock.anyObject[LogOffsetMetadata]))
-      .andReturn(Some(LogOffsetMetadata(partitionHW))).anyTimes()
-    EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchHighWatermark,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
-      )).anyTimes()
-    EasyMock.expect(log.read(
-      startOffset = 0,
-      maxLength = fetchSize,
-      isolation = FetchLogEnd,
-      minOneMessage = true))
-      .andReturn(FetchDataInfo(
-        LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
-      )).anyTimes()
-    EasyMock.replay(log)
-
-    // create the log manager that is aware of this mock log
-    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
-    EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
-    EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
-    EasyMock.replay(logManager)
-
-    // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
-      new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
-
-    // add the partition with two replicas, both in ISR
-    val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
-
-    // create the leader replica with the local log
-    log.updateHighWatermark(partitionHW)
-    partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
-    partition.setLog(log, false)
-
-    // create the follower replica with defined log end offset
-    val followerId = configs(1).brokerId
-    val allReplicas = Seq(configs.head.brokerId, followerId)
-    partition.updateAssignmentAndIsr(
-      assignment = allReplicas,
-      isr = allReplicas.toSet,
-      addingReplicas = Seq.empty,
-      removingReplicas = Seq.empty
-    )
-    val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    partition.updateFollowerFetchState(
-      followerId,
-      followerFetchOffsetMetadata = leo,
-      followerStartOffset = 0L,
-      followerFetchTimeMs= time.milliseconds,
-      leaderEndOffset = leo.messageOffset,
-      partition.localLogOrException.highWatermark)
-  }
-
-  @After
-  def tearDown(): Unit = {
-    replicaManager.shutdown(false)
-    metrics.close()
-  }
-
-  /**
-   * The scenario for this test is that there is one topic that has one partition
-   * with one leader replica on broker "0" and one follower replica on broker "1"
-   * inside the replica manager's metadata.
-   *
-   * The leader replica on "0" has HW of "5" and LEO of "20".  The follower on
-   * broker "1" has a local replica with a HW matching the leader's ("5") and
-   * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR).
-   *
-   * When a fetch operation with read committed data turned on is received, the replica manager
-   * should only return data up to the HW of the partition; when a fetch operation with read
-   * committed data turned off is received, the replica manager could return data up to the LEO
-   * of the local leader replica's log.
-   *
-   * This test also verifies counts of fetch requests recorded by the ReplicaManager
-   */
-  @Test
-  def testReadFromLog(): Unit = {

Review comment:
       I decided to get rid of this. I believe it is already covered by test cases in `ReplicaManagerTest`. See for example, `testFetchBeyondHighWatermarkReturnEmptyResponse`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org