You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/05/05 04:39:37 UTC
[kafka] branch trunk updated: KAFKA-9731: Disable immediate fetch
response for hw propagation if replica selector is not defined (#8607)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fbfda2c KAFKA-9731: Disable immediate fetch response for hw propagation if replica selector is not defined (#8607)
fbfda2c is described below
commit fbfda2c4ad889c731aa52b5214e0521f187f8db6
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon May 4 21:38:53 2020 -0700
KAFKA-9731: Disable immediate fetch response for hw propagation if replica selector is not defined (#8607)
In the case described in the JIRA, there was a 50%+ increase in the total fetch request rate in
2.4.0 due to this change.
I included a few additional clean-ups:
* Simplify `findPreferredReadReplica` and avoid unnecessary collection copies.
* Use `LongSupplier` instead of `Supplier<Long>` in `SubscriptionState` to avoid unnecessary boxing.
Added a unit test to ReplicaManagerTest and cleaned up the test class a bit including
consistent usage of Time in MockTimer and other components.
Reviewers: Gwen Shapira <gw...@confluent.io>, David Arthur <mu...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/SubscriptionState.java | 8 +-
.../main/scala/kafka/server/ReplicaManager.scala | 57 ++++-----
.../unit/kafka/server/ReplicaManagerTest.scala | 135 +++++++++++++++------
.../scala/unit/kafka/utils/timer/MockTimer.scala | 3 +-
gradle/spotbugs-exclude.xml | 7 ++
5 files changed, 137 insertions(+), 73 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6568c91..5b375da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -39,8 +39,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.LongSupplier;
import java.util.function.Predicate;
-import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collector;
import java.util.stream.Collectors;
@@ -516,7 +516,7 @@ public class SubscriptionState {
* @param preferredReadReplicaId The preferred read replica
* @param timeMs The time at which this preferred replica is no longer valid
*/
- public synchronized void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId, Supplier<Long> timeMs) {
+ public synchronized void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId, LongSupplier timeMs) {
assignedState(tp).updatePreferredReadReplica(preferredReadReplicaId, timeMs);
}
@@ -721,10 +721,10 @@ public class SubscriptionState {
}
}
- private void updatePreferredReadReplica(int preferredReadReplica, Supplier<Long> timeMs) {
+ private void updatePreferredReadReplica(int preferredReadReplica, LongSupplier timeMs) {
if (this.preferredReadReplica == null || preferredReadReplica != this.preferredReadReplica) {
this.preferredReadReplica = preferredReadReplica;
- this.preferredReadReplicaExpireTimeMs = timeMs.get();
+ this.preferredReadReplicaExpireTimeMs = timeMs.getAsLong();
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5bdc3d2..b387785 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig,
metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))
if (preferredReadReplica.isDefined) {
- replicaSelectorOpt.foreach{ selector =>
+ replicaSelectorOpt.foreach { selector =>
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
s"${preferredReadReplica.get} for $clientMetadata")
}
@@ -1079,9 +1079,9 @@ class ReplicaManager(val config: KafkaConfig,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
- // Check if the HW known to the follower is behind the actual HW
- val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId)
- .exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)
+ // Check if the HW known to the follower is behind the actual HW if a replica selector is defined
+ val followerNeedsHwUpdate = replicaSelectorOpt.isDefined &&
+ partition.getReplica(replicaId).exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)
val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) {
// If the partition is being throttled, simply return an empty set.
@@ -1170,44 +1170,35 @@ class ReplicaManager(val config: KafkaConfig,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int] = {
- if (partition.isLeader) {
- if (Request.isValidBrokerId(replicaId)) {
- // Don't look up preferred for follower fetches via normal replication
- Option.empty
- } else {
+ partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
+ // Don't look up preferred for follower fetches via normal replication
+ if (Request.isValidBrokerId(replicaId))
+ None
+ else {
replicaSelectorOpt.flatMap { replicaSelector =>
- val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition, new ListenerName(clientMetadata.listenerName))
- var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas
+ val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
+ new ListenerName(clientMetadata.listenerName))
+ val replicaInfos = partition.remoteReplicas
// Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
- .filter(replica => replica.logEndOffset >= fetchOffset)
- .filter(replica => replica.logStartOffset <= fetchOffset)
+ .filter(replica => replica.logEndOffset >= fetchOffset && replica.logStartOffset <= fetchOffset)
.map(replica => new DefaultReplicaView(
replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
replica.logEndOffset,
currentTimeMs - replica.lastCaughtUpTimeMs))
- .toSet
-
- if (partition.leaderReplicaIdOpt.isDefined) {
- val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt
- .map(replicaId => replicaEndpoints.getOrElse(replicaId, Node.noNode()))
- .map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset, 0L))
- .get
- replicaInfoSet ++= Set(leaderReplica)
-
- val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
- replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala
- .filter(!_.endpoint.isEmpty)
- // Even though the replica selector can return the leader, we don't want to send it out with the
- // FetchResponse, so we exclude it here
- .filter(!_.equals(leaderReplica))
- .map(_.endpoint.id)
- } else {
- None
+
+ val leaderReplica = new DefaultReplicaView(
+ replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),
+ partition.localLogOrException.logEndOffset, 0L)
+ val replicaInfoSet = mutable.Set[ReplicaView]() ++= replicaInfos += leaderReplica
+
+ val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
+ replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect {
+ // Even though the replica selector can return the leader, we don't want to send it out with the
+ // FetchResponse, so we exclude it here
+ case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
}
}
}
- } else {
- None
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0ed04d5..09e4f14 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -221,7 +221,7 @@ class ReplicaManagerTest {
}
private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
- val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
@@ -277,7 +277,7 @@ class ReplicaManagerTest {
@Test
def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
- val timer = new MockTimer
+ val timer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
try {
@@ -337,7 +337,7 @@ class ReplicaManagerTest {
@Test
def testReadCommittedFetchLimitedAtLSO(): Unit = {
- val timer = new MockTimer
+ val timer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
try {
@@ -444,7 +444,7 @@ class ReplicaManagerTest {
@Test
def testDelayedFetchIncludesAbortedTransactions(): Unit = {
- val timer = new MockTimer
+ val timer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
try {
@@ -521,7 +521,7 @@ class ReplicaManagerTest {
@Test
def testFetchBeyondHighWatermark(): Unit = {
- val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2))
+ val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2))
try {
val brokerList = Seq[Integer](0, 1, 2).asJava
@@ -579,7 +579,7 @@ class ReplicaManagerTest {
val maxFetchBytes = 1024 * 1024
val aliveBrokersIds = Seq(0, 1)
val leaderEpoch = 5
- val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokersIds)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokersIds)
try {
val tp = new TopicPartition(topic, 0)
val replicas = aliveBrokersIds.toList.map(Int.box).asJava
@@ -677,7 +677,7 @@ class ReplicaManagerTest {
*/
@Test
def testFetchMessagesWhenNotFollowerForOnePartition(): Unit = {
- val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2))
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2))
try {
// Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each
@@ -791,8 +791,9 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
- val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
- topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true)
+ val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time),
+ topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
+ expectTruncation = true, localLogOffset = Some(10))
// Initialize partition state to follower, with leader = 1, leaderEpoch = 1
val tp = new TopicPartition(topic, topicPartition)
@@ -830,7 +831,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
- val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+ val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@@ -863,7 +864,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
- val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+ val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@@ -912,7 +913,7 @@ class ReplicaManagerTest {
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
- val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+ val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
@@ -951,6 +952,70 @@ class ReplicaManagerTest {
assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
}
+ @Test
+ def testFollowerFetchWithDefaultSelectorNoForcedHwPropagation(): Unit = {
+ val topicPartition = 0
+ val followerBrokerId = 0
+ val leaderBrokerId = 1
+ val leaderEpoch = 1
+ val leaderEpochIncrement = 2
+ val countDownLatch = new CountDownLatch(1)
+ val timer = new MockTimer(time)
+
+ // Prepare the mocked components for the test
+ val (replicaManager, _) = prepareReplicaManagerAndLogManager(timer,
+ topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+ leaderBrokerId, countDownLatch, expectTruncation = true)
+
+ val brokerList = Seq[Integer](0, 1).asJava
+
+ val tp0 = new TopicPartition(topic, 0)
+
+ replicaManager.createPartition(new TopicPartition(topic, 0))
+
+ // Make this replica the follower
+ val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(0)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(1)
+ .setIsr(brokerList)
+ .setZkVersion(0)
+ .setReplicas(brokerList)
+ .setIsNew(false)).asJava,
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+ replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+
+ val simpleRecords = Seq(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+ val appendResult = appendRecords(replicaManager, tp0,
+ MemoryRecords.withRecords(CompressionType.NONE, simpleRecords.toSeq: _*), AppendOrigin.Client)
+
+ // Increment the hw in the leader by fetching from the last offset
+ val fetchOffset = simpleRecords.size
+ var followerResult = fetchAsFollower(replicaManager, tp0,
+ new PartitionData(fetchOffset, 0, 100000, Optional.empty()),
+ clientMetadata = None)
+ assertTrue(followerResult.isFired)
+ assertEquals(0, followerResult.assertFired.highWatermark)
+
+ assertTrue("Expected producer request to be acked", appendResult.isFired)
+
+ // Fetch from the same offset, no new data is expected and hence the fetch request should
+ // go to the purgatory
+ followerResult = fetchAsFollower(replicaManager, tp0,
+ new PartitionData(fetchOffset, 0, 100000, Optional.empty()),
+ clientMetadata = None, minBytes = 1000)
+ assertFalse("Request completed immediately unexpectedly", followerResult.isFired)
+
+ // Complete the request in the purgatory by advancing the clock
+ timer.advanceClock(1001)
+ assertTrue(followerResult.isFired)
+
+ assertEquals(fetchOffset, followerResult.assertFired.highWatermark)
+ }
+
@Test(expected = classOf[ClassNotFoundException])
def testUnknownReplicaSelector(): Unit = {
val topicPartition = 0
@@ -962,7 +1027,7 @@ class ReplicaManagerTest {
val props = new Properties()
props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class")
- prepareReplicaManagerAndLogManager(
+ prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props)
}
@@ -976,7 +1041,7 @@ class ReplicaManagerTest {
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
- val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+ val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
assertFalse(replicaManager.replicaSelectorOpt.isDefined)
@@ -984,7 +1049,7 @@ class ReplicaManagerTest {
@Test
def testFetchFollowerNotAllowedForOlderClients(): Unit = {
- val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1))
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
@@ -1022,7 +1087,7 @@ class ReplicaManagerTest {
@Test
def testFetchRequestRateMetrics(): Unit = {
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@@ -1067,7 +1132,7 @@ class ReplicaManagerTest {
@Test
def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = {
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@@ -1115,7 +1180,7 @@ class ReplicaManagerTest {
@Test
def testBecomeFollowerWhileNewClientFetchInPurgatory(): Unit = {
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@@ -1164,7 +1229,7 @@ class ReplicaManagerTest {
@Test
def testFetchFromLeaderAlwaysAllowed(): Unit = {
- val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1))
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
@@ -1205,7 +1270,7 @@ class ReplicaManagerTest {
// In this case, we should ensure that pending purgatory operations are cancelled
// immediately rather than sitting around to timeout.
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@@ -1244,7 +1309,7 @@ class ReplicaManagerTest {
@Test
def testClearProducePurgatoryOnStopReplica(): Unit = {
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@@ -1329,22 +1394,22 @@ class ReplicaManagerTest {
* ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing
* 'leaderEpochInLeaderAndIsr' leader epoch for partition 'topicPartition'.
*/
- private def prepareReplicaManagerAndLogManager(topicPartition: Int,
+ private def prepareReplicaManagerAndLogManager(timer: MockTimer,
+ topicPartition: Int,
leaderEpochInLeaderAndIsr: Int,
followerBrokerId: Int,
leaderBrokerId: Int,
countDownLatch: CountDownLatch,
expectTruncation: Boolean,
+ localLogOffset: Option[Long] = None,
+ offsetFromLeader: Long = 5,
+ leaderEpochFromLeader: Int = 3,
extraProps: Properties = new Properties()) : (ReplicaManager, LogManager) = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.asScala ++= extraProps.asScala
val config = KafkaConfig.fromProps(props)
- // Setup mock local log to have leader epoch of 3 and offset of 10
- val localLogOffset = 10
- val offsetFromLeader = 5
- val leaderEpochFromLeader = 3
val mockScheduler = new MockScheduler(time)
val mockBrokerTopicStats = new BrokerTopicStats
val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
@@ -1365,14 +1430,17 @@ class ReplicaManagerTest {
override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
assertEquals(leaderEpoch, leaderEpochFromLeader)
- Some(OffsetAndEpoch(localLogOffset, leaderEpochFromLeader))
+ localLogOffset.map { logOffset =>
+ Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader))
+ }.getOrElse(super.endOffsetForEpoch(leaderEpoch))
}
override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader)
- override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
+ override def logEndOffsetMetadata: LogOffsetMetadata =
+ localLogOffset.map(LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata)
- override def logEndOffset: Long = localLogOffset
+ override def logEndOffset: Long = localLogOffset.getOrElse(super.logEndOffset)
}
// Expect to call LogManager.truncateTo exactly once
@@ -1414,7 +1482,6 @@ class ReplicaManagerTest {
.anyTimes()
EasyMock.replay(metadataCache)
- val timer = new MockTimer
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
@@ -1822,7 +1889,7 @@ class ReplicaManagerTest {
@Test
def testStopReplicaWithStaleControllerEpoch(): Unit = {
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@@ -1848,7 +1915,7 @@ class ReplicaManagerTest {
@Test
def testStopReplicaWithOfflinePartition(): Unit = {
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@@ -1890,7 +1957,7 @@ class ReplicaManagerTest {
}
private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean, throwIOException: Boolean): Unit = {
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
@@ -1983,7 +2050,7 @@ class ReplicaManagerTest {
deletePartition: Boolean,
throwIOException: Boolean,
expectedOutput: Errors): Unit = {
- val mockTimer = new MockTimer
+ val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
index 8805b11..819954a 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
@@ -20,9 +20,8 @@ import kafka.utils.MockTime
import scala.collection.mutable
-class MockTimer extends Timer {
+class MockTimer(val time: MockTime = new MockTime) extends Timer {
- val time = new MockTime
private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse)
def add(timerTask: TimerTask): Unit = {
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 86cc464..6e9a6c1 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -156,6 +156,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
+ <Source name="ReplicaManager.scala"/>
+ <Package name="kafka.server"/>
+ <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
+ </Match>
+
+ <Match>
+ <!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="LogManager.scala"/>
<Package name="kafka.log"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>