You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/02/11 01:19:40 UTC

[kafka] branch 2.8 updated: MINOR: Prevent creating partition.metadata until ID can be written (#10100)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 79fda69  MINOR: Prevent creating partition.metadata until ID can be written (#10100)
79fda69 is described below

commit 79fda69365147f899012b1d74fb3d08543dbb6ca
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Wed Feb 10 20:18:24 2021 -0500

    MINOR: Prevent creating partition.metadata until ID can be written (#10100)
    
    Currently the partition.metadata file is created when the log is created. However, clusters with older inter-broker protocols will never use this file. This PR moves the creation of the file to when we write to the file.
    
    This PR also deletes the partition.metadata file on startup if the IBP version is lower than 2.8.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 29 +++++++++++++++-------
 core/src/main/scala/kafka/log/LogManager.scala     | 15 +++++++----
 core/src/main/scala/kafka/raft/RaftManager.scala   |  3 ++-
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 .../scala/kafka/server/PartitionMetadataFile.scala | 14 +++++------
 .../main/scala/kafka/server/ReplicaManager.scala   |  7 +++---
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  8 +++---
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  9 +++----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 25 ++++++++-----------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  3 ++-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  3 ++-
 .../partition/PartitionMakeFollowerBenchmark.java  |  3 ++-
 .../UpdateFollowerFetchStateBenchmark.java         |  3 ++-
 13 files changed, 70 insertions(+), 54 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index aa9b739..2249b5e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -242,6 +242,12 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
  * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
  * @param hadCleanShutdown boolean flag to indicate if the Log had a clean/graceful shutdown last time. true means
  *                         clean shutdown whereas false means a crash.
+ * @param keepPartitionMetadataFile boolean flag to indicate whether the partition.metadata file should be kept in the
+ *                                  log directory. A partition.metadata file is only created when the controller's
+ *                                  inter-broker protocol version is at least 2.8. This file will persist the topic ID on
+ *                                  the broker. If inter-broker protocol is downgraded below 2.8, a topic ID may be lost
+ *                                  and a new ID generated upon re-upgrade. If the inter-broker protocol version is below
+ *                                  2.8, partition.metadata will be deleted to avoid ID conflicts upon re-upgrade.
  */
 @threadsafe
 class Log(@volatile private var _dir: File,
@@ -256,7 +262,8 @@ class Log(@volatile private var _dir: File,
           val topicPartition: TopicPartition,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
-          private val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup {
+          private val hadCleanShutdown: Boolean = true,
+          val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
 
@@ -307,7 +314,7 @@ class Log(@volatile private var _dir: File,
   // Visible for testing
   @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
 
-  @volatile var partitionMetadataFile : Option[PartitionMetadataFile] = None
+  @volatile var partitionMetadataFile : PartitionMetadataFile = null
 
   @volatile var topicId : Uuid = Uuid.ZERO_UUID
 
@@ -341,10 +348,13 @@ class Log(@volatile private var _dir: File,
     producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
     loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)
 
-    // Recover topic ID if present
-    partitionMetadataFile.foreach { file =>
-      if (!file.isEmpty())
-        topicId = file.read().topicId
+    // Delete partition metadata file if the version does not support topic IDs.
+    // Recover topic ID if present and topic IDs are supported
+    if (partitionMetadataFile.exists()) {
+        if (!keepPartitionMetadataFile)
+          partitionMetadataFile.delete()
+        else
+          topicId = partitionMetadataFile.read().topicId
     }
   }
 
@@ -564,7 +574,7 @@ class Log(@volatile private var _dir: File,
 
   private def initializePartitionMetadata(): Unit = lock synchronized {
     val partitionMetadata = PartitionMetadataFile.newFile(dir)
-    partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel))
+    partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)
   }
 
   private def initializeLeaderEpochCache(): Unit = lock synchronized {
@@ -2563,11 +2573,12 @@ object Log {
             maxProducerIdExpirationMs: Int,
             producerIdExpirationCheckIntervalMs: Int,
             logDirFailureChannel: LogDirFailureChannel,
-            lastShutdownClean: Boolean = true): Log = {
+            lastShutdownClean: Boolean = true,
+            keepPartitionMetadataFile: Boolean = true): Log = {
     val topicPartition = Log.parseTopicPartitionName(dir)
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs,
-      producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel, lastShutdownClean)
+      producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel, lastShutdownClean, keepPartitionMetadataFile)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index b788bf0..acb9d34 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -62,7 +62,8 @@ class LogManager(logDirs: Seq[File],
                  scheduler: Scheduler,
                  brokerTopicStats: BrokerTopicStats,
                  logDirFailureChannel: LogDirFailureChannel,
-                 time: Time) extends Logging with KafkaMetricsGroup {
+                 time: Time,
+                 val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
 
   import LogManager._
 
@@ -268,7 +269,8 @@ class LogManager(logDirs: Seq[File],
       time = time,
       brokerTopicStats = brokerTopicStats,
       logDirFailureChannel = logDirFailureChannel,
-      lastShutdownClean = hadCleanShutdown)
+      lastShutdownClean = hadCleanShutdown,
+      keepPartitionMetadataFile = keepPartitionMetadataFile)
 
     if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
       addLogToBeDeleted(log)
@@ -824,7 +826,8 @@ class LogManager(logDirs: Seq[File],
           scheduler = scheduler,
           time = time,
           brokerTopicStats = brokerTopicStats,
-          logDirFailureChannel = logDirFailureChannel)
+          logDirFailureChannel = logDirFailureChannel,
+          keepPartitionMetadataFile = keepPartitionMetadataFile)
 
         if (isFuture)
           futureLogs.put(topicPartition, log)
@@ -1208,7 +1211,8 @@ object LogManager {
             kafkaScheduler: KafkaScheduler,
             time: Time,
             brokerTopicStats: BrokerTopicStats,
-            logDirFailureChannel: LogDirFailureChannel): LogManager = {
+            logDirFailureChannel: LogDirFailureChannel,
+            keepPartitionMetadataFile: Boolean): LogManager = {
     val defaultProps = LogConfig.extractLogConfigMap(config)
 
     LogConfig.validateValues(defaultProps)
@@ -1230,7 +1234,8 @@ object LogManager {
       scheduler = kafkaScheduler,
       brokerTopicStats = brokerTopicStats,
       logDirFailureChannel = logDirFailureChannel,
-      time = time)
+      time = time,
+      keepPartitionMetadataFile = keepPartitionMetadataFile)
   }
 
 }
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index f10df1f..b9a77b7 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -230,7 +230,8 @@ class KafkaRaftManager[T](
       time = time,
       maxProducerIdExpirationMs = config.transactionalIdExpirationMs,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
-      logDirFailureChannel = new LogDirFailureChannel(5)
+      logDirFailureChannel = new LogDirFailureChannel(5),
+      keepPartitionMetadataFile = config.usesTopicId
     )
 
     KafkaMetadataLog(log, topicPartition)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7aed40d..7ec7a29 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -246,7 +246,7 @@ class KafkaServer(
         /* start log manager */
         logManager = LogManager(config, initialOfflineDirs,
           new ZkConfigRepository(new AdminZkClient(zkClient)),
-          kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
+          kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, config.usesTopicId)
         brokerState.set(BrokerState.RECOVERY)
         logManager.startup(zkClient.getAllTopicsInCluster())
 
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
index 1adcbc3..25b1ba6 100644
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
-import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+import java.nio.file.{Files, Paths}
 import java.util.regex.Pattern
 
 import kafka.utils.Logging
@@ -91,10 +91,6 @@ class PartitionMetadataFile(val file: File,
   private val lock = new Object()
   private val logDir = file.getParentFile.getParent
 
-
-  try Files.createFile(file.toPath) // create the file if it doesn't exist
-  catch { case _: FileAlreadyExistsException => }
-
   def write(topicId: Uuid): Unit = {
     lock synchronized {
       try {
@@ -138,7 +134,11 @@ class PartitionMetadataFile(val file: File,
     }
   }
 
-  def isEmpty(): Boolean = {
-    file.length() == 0
+  def exists(): Boolean = {
+    file.exists()
+  }
+
+  def delete(): Boolean = {
+    file.delete()
   }
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3f52e22..ba50c86 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1424,17 +1424,18 @@ class ReplicaManager(val config: KafkaConfig,
            * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
            * we need to map this topic-partition to OfflinePartition instead.
            */
-            if (localLog(topicPartition).isEmpty)
+            val local = localLog(topicPartition)
+            if (local.isEmpty)
               markPartitionOffline(topicPartition)
             else {
               val id = topicIds.get(topicPartition.topic())
               // Ensure we have not received a request from an older protocol
               if (id != null && !id.equals(Uuid.ZERO_UUID)) {
-                val log = localLog(topicPartition).get
+                val log = local.get
                 // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
                 // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
                 if (log.topicId.equals(Uuid.ZERO_UUID)) {
-                  log.partitionMetadataFile.get.write(id)
+                  log.partitionMetadataFile.write(id)
                   log.topicId = id
                   // Warn if the topic ID in the request does not match the log.
                 } else if (!log.topicId.equals(id)) {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 01ca38c..c3698dc 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -229,8 +229,8 @@ class LogManagerTest {
       s.lazyTimeIndex.get
     })
 
-    // there should be a log file, two indexes, one producer snapshot, partition metadata, and the leader epoch checkpoint
-    assertEquals(log.numberOfSegments * 4 + 2, log.dir.list.length, "Files should have been deleted")
+    // there should be a log file, two indexes, one producer snapshot, and the leader epoch checkpoint
+    assertEquals(log.numberOfSegments * 4 + 1, log.dir.list.length, "Files should have been deleted")
     assertEquals(0, readLog(log, offset + 1).records.sizeInBytes, "Should get empty fetch off new log.")
 
     assertThrows(classOf[OffsetOutOfRangeException], () => readLog(log, 0), () => "Should get exception from fetching earlier.")
@@ -274,8 +274,8 @@ class LogManagerTest {
     time.sleep(log.config.fileDeleteDelayMs + 1)
 
     // there should be a log file, two indexes (the txn index is created lazily),
-    // and a producer snapshot file per segment, and the leader epoch checkpoint and partition metadata file.
-    assertEquals(log.numberOfSegments * 4 + 2, log.dir.list.length, "Files should have been deleted")
+    // and a producer snapshot file per segment, and the leader epoch checkpoint.
+    assertEquals(log.numberOfSegments * 4 + 1, log.dir.list.length, "Files should have been deleted")
     assertEquals(0, readLog(log, offset + 1).records.sizeInBytes, "Should get empty fetch off new log.")
     assertThrows(classOf[OffsetOutOfRangeException], () => readLog(log, 0))
     // log should still be appendable
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b39dd7a..dab9eb1 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -98,7 +98,7 @@ class LogTest {
         initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4,
         flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L,
         retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time,
-        brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) {
+        brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size), keepPartitionMetadataFile = config.usesTopicId) {
 
          override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
                      logStartOffsets: Map[TopicPartition, Long], topicConfigs: Map[String, LogConfig]): Log = {
@@ -2530,7 +2530,7 @@ class LogTest {
     var log = createLog(logDir, logConfig)
 
     val topicId = Uuid.randomUuid()
-    log.partitionMetadataFile.get.write(topicId)
+    log.partitionMetadataFile.write(topicId)
     log.close()
 
     // test recovery case
@@ -3100,7 +3100,7 @@ class LogTest {
     // Write a topic ID to the partition metadata file to ensure it is transferred correctly.
     val id = Uuid.randomUuid()
     log.topicId = id
-    log.partitionMetadataFile.get.write(id)
+    log.partitionMetadataFile.write(id)
 
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
     assertEquals(Some(5), log.latestEpoch)
@@ -3115,8 +3115,7 @@ class LogTest {
 
     // Check the topic ID remains in memory and was copied correctly.
     assertEquals(id, log.topicId)
-    assertFalse(log.partitionMetadataFile.isEmpty)
-    assertEquals(id, log.partitionMetadataFile.get.read().topicId)
+    assertEquals(id, log.partitionMetadataFile.read().topicId)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a21a016..c31bf8e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2248,9 +2248,8 @@ class ReplicaManagerTest {
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val id = topicIds.get(topicPartition.topic())
       val log = replicaManager.localLog(topicPartition).get
-      assertFalse(log.partitionMetadataFile.isEmpty)
-      assertFalse(log.partitionMetadataFile.get.isEmpty())
-      val partitionMetadata = log.partitionMetadataFile.get.read()
+      assertTrue(log.partitionMetadataFile.exists())
+      val partitionMetadata = log.partitionMetadataFile.read()
 
       // Current version of PartitionMetadataFile is 0.
       assertEquals(0, partitionMetadata.version)
@@ -2285,33 +2284,29 @@ class ReplicaManagerTest {
         topicIds,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build().serialize(), version)
 
-      // The file has no contents if the topic does not have an associated topic ID.
+      // There is no file if the topic does not have an associated topic ID.
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val log = replicaManager.localLog(topicPartition).get
-      assertFalse(log.partitionMetadataFile.isEmpty)
-      assertTrue(log.partitionMetadataFile.get.isEmpty())
+      assertFalse(log.partitionMetadataFile.exists())
 
-      // The file has no contents if the topic has the default UUID.
+      // There is no file if the topic has the default UUID.
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val log2 = replicaManager.localLog(topicPartition).get
-      assertFalse(log2.partitionMetadataFile.isEmpty)
-      assertTrue(log2.partitionMetadataFile.get.isEmpty())
+      assertFalse(log2.partitionMetadataFile.exists())
 
-      // The file has no contents if the request is an older version
+      // There is no file if the request an older version
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
       val log3 = replicaManager.localLog(topicPartitionFoo).get
-      assertFalse(log3.partitionMetadataFile.isEmpty)
-      assertTrue(log3.partitionMetadataFile.get.isEmpty())
+      assertFalse(log3.partitionMetadataFile.exists())
 
-      // The file has no contents if the request is an older version
+      // There is no file if the request is an older version
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 4), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
       val log4 = replicaManager.localLog(topicPartitionFoo).get
-      assertFalse(log4.partitionMetadataFile.isEmpty)
-      assertTrue(log4.partitionMetadataFile.get.isEmpty())
+      assertFalse(log4.partitionMetadataFile.exists())
     } finally replicaManager.shutdown(checkpointHW = false)
   }
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 6a7db80..43df2b9 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1089,7 +1089,8 @@ object TestUtils extends Logging {
                    scheduler = time.scheduler,
                    time = time,
                    brokerTopicStats = new BrokerTopicStats,
-                   logDirFailureChannel = new LogDirFailureChannel(logDirs.size))
+                   logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
+                   keepPartitionMetadataFile = true)
   }
 
   class MockAlterIsrManager extends AlterIsrManager {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 8cc30fc..424b8df 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -134,7 +134,8 @@ public class ReplicaFetcherThreadBenchmark {
                 scheduler,
                 brokerTopicStats,
                 logDirFailureChannel,
-                Time.SYSTEM);
+                Time.SYSTEM,
+                true);
 
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<BaseRecords>> initialFetched = new LinkedHashMap<>();
         scala.collection.mutable.Map<TopicPartition, InitialFetchState> initialFetchStates = new scala.collection.mutable.HashMap<>();
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 9f33ceb..ece6f86 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -110,7 +110,8 @@ public class PartitionMakeFollowerBenchmark {
             scheduler,
             brokerTopicStats,
             logDirFailureChannel,
-            Time.SYSTEM);
+            Time.SYSTEM,
+            true);
 
         TopicPartition tp = new TopicPartition("topic", 0);
 
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 1230a1c..a82c6a0 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -95,7 +95,8 @@ public class UpdateFollowerFetchStateBenchmark {
                 scheduler,
                 brokerTopicStats,
                 logDirFailureChannel,
-                Time.SYSTEM);
+                Time.SYSTEM,
+                true);
         OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
         Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), topicPartition)).thenReturn(Option.apply(0L));
         DelayedOperations delayedOperations = new DelayedOperationsMock();