You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/11 02:00:19 UTC

[kafka] branch trunk updated: KAFKA-8333; Load high watermark checkpoint lazily when initializing replicas (#6800)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 783ab74  KAFKA-8333; Load high watermark checkpoint lazily when initializing replicas (#6800)
783ab74 is described below

commit 783ab74793cc7e541e6b8ed4e1c545bf5dcff959
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 10 18:59:59 2019 -0700

    KAFKA-8333; Load high watermark checkpoint lazily when initializing replicas (#6800)
    
    Currently we load the high watermark checkpoint separately for every replica that we load. This patch makes this loading logic lazy and caches the loaded map while a LeaderAndIsr request is being handled.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  6 ++--
 .../server/checkpoints/OffsetCheckpointFile.scala  | 23 +++++++++++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 20 ++++++------
 .../checkpoints/OffsetCheckpointFileTest.scala     | 36 ++++++++++++++++++++++
 4 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8b383c2..64b6bee 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,7 +29,7 @@ import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.server.checkpoints.{OffsetCheckpointFile, OffsetCheckpoints, SimpleOffsetCheckpoints}
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.ElectionType
@@ -604,7 +604,7 @@ class ReplicaManager(val config: KafkaConfig,
           //   start ReplicaAlterDirThread to move data of this partition from the current log to the future log
           // - Otherwise, return KafkaStorageException. We do not create the future log while there is offline log directory
           //   so that we can avoid creating future log for the same partition in multiple log directories.
-          val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+          val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
           if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
             val futureReplica = futureLocalReplicaOrException(topicPartition)
             logManager.abortAndPauseCleaning(topicPartition)
@@ -1109,7 +1109,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
         val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
 
-        val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+        val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
         val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
           makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
             highWatermarkCheckpoints)
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 715f42f..69e62d2 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -66,13 +66,26 @@ trait OffsetCheckpoints {
   def fetch(logDir: String, topicPartition: TopicPartition): Option[Long]
 }
 
-class SimpleOffsetCheckpoints(checkpointFilesByLogDir: Map[String, OffsetCheckpointFile])
-  extends OffsetCheckpoints {
+/**
+ * Loads checkpoint files on demand and caches the offsets for reuse.
+ */
+class LazyOffsetCheckpoints(checkpointsByLogDir: Map[String, OffsetCheckpointFile]) extends OffsetCheckpoints {
+  private val lazyCheckpointsByLogDir = checkpointsByLogDir.map { case (logDir, checkpointFile) =>
+    logDir -> new LazyOffsetCheckpointMap(checkpointFile)
+  }.toMap
 
   override def fetch(logDir: String, topicPartition: TopicPartition): Option[Long] = {
-    val checkpoint = checkpointFilesByLogDir(logDir)
-    val offsetMap = checkpoint.read()
-    offsetMap.get(topicPartition)
+    val offsetCheckpointFile = lazyCheckpointsByLogDir.getOrElse(logDir,
+      throw new IllegalArgumentException(s"No checkpoint file for log dir $logDir"))
+    offsetCheckpointFile.fetch(topicPartition)
+  }
+}
+
+class LazyOffsetCheckpointMap(checkpoint: OffsetCheckpointFile) {
+  private lazy val offsets: Map[TopicPartition, Long] = checkpoint.read()
+
+  def fetch(topicPartition: TopicPartition): Option[Long] = {
+    offsets.get(topicPartition)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 59248f0..64a29ed 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.cluster.BrokerEndPoint
-import kafka.server.checkpoints.SimpleOffsetCheckpoints
+import kafka.server.checkpoints.LazyOffsetCheckpoints
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
@@ -87,7 +87,7 @@ class ReplicaManagerTest {
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
-      partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(1, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
@@ -106,7 +106,7 @@ class ReplicaManagerTest {
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
-      partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(1, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
@@ -160,7 +160,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
         collection.immutable.Map(new TopicPartition(topic, 0) ->
@@ -204,7 +204,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -255,7 +255,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -351,7 +351,7 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1).asJava
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -417,7 +417,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+      partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -552,7 +552,7 @@ class ReplicaManagerTest {
       // Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each
       val tp0 = new TopicPartition(topic, 0)
       val tp1 = new TopicPartition(topic, 1)
-      val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
       replicaManager.createPartition(tp0).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
       replicaManager.createPartition(tp1).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
       val partition0Replicas = Seq[Integer](0, 1).asJava
@@ -645,7 +645,7 @@ class ReplicaManagerTest {
 
     // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
     val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition))
-    val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
     partition.getOrCreateReplica(followerBrokerId, isNew = false, offsetCheckpoints)
     partition.makeFollower(controllerId,
       leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
index 2d20674..99a40b6 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -22,6 +22,8 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.junit.Assert._
 import org.junit.Test
+import org.mockito.Mockito
+import org.scalatest.Assertions.assertThrows
 
 import scala.collection.Map
 
@@ -98,4 +100,38 @@ class OffsetCheckpointFileTest extends Logging {
     new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()
   }
 
+  @Test
+  def testLazyOffsetCheckpoint(): Unit = {
+    val logDir = "/tmp/kafka-logs"
+    val mockCheckpointFile = Mockito.mock(classOf[OffsetCheckpointFile])
+
+    val lazyCheckpoints = new LazyOffsetCheckpoints(Map(logDir -> mockCheckpointFile))
+    Mockito.verify(mockCheckpointFile, Mockito.never()).read()
+
+    val partition0 = new TopicPartition("foo", 0)
+    val partition1 = new TopicPartition("foo", 1)
+    val partition2 = new TopicPartition("foo", 2)
+
+    Mockito.when(mockCheckpointFile.read()).thenReturn(Map(
+      partition0 -> 1000L,
+      partition1 -> 2000L
+    ))
+
+    assertEquals(Some(1000L), lazyCheckpoints.fetch(logDir, partition0))
+    assertEquals(Some(2000L), lazyCheckpoints.fetch(logDir, partition1))
+    assertEquals(None, lazyCheckpoints.fetch(logDir, partition2))
+
+    Mockito.verify(mockCheckpointFile, Mockito.times(1)).read()
+  }
+
+  @Test
+  def testLazyOffsetCheckpointFileInvalidLogDir(): Unit = {
+    val logDir = "/tmp/kafka-logs"
+    val mockCheckpointFile = Mockito.mock(classOf[OffsetCheckpointFile])
+    val lazyCheckpoints = new LazyOffsetCheckpoints(Map(logDir -> mockCheckpointFile))
+    assertThrows[IllegalArgumentException] {
+      lazyCheckpoints.fetch("/invalid/kafka-logs", new TopicPartition("foo", 0))
+    }
+  }
+
 }