You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/11 02:00:19 UTC
[kafka] branch trunk updated: KAFKA-8333;
Load high watermark checkpoint lazily when initializing replicas
(#6800)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 783ab74 KAFKA-8333; Load high watermark checkpoint lazily when initializing replicas (#6800)
783ab74 is described below
commit 783ab74793cc7e541e6b8ed4e1c545bf5dcff959
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 10 18:59:59 2019 -0700
KAFKA-8333; Load high watermark checkpoint lazily when initializing replicas (#6800)
Currently we load the high watermark checkpoint separately for every replica that we load. This patch makes this loading logic lazy and caches the loaded map while a LeaderAndIsr request is being handled.
Reviewers: Jun Rao <ju...@gmail.com>
---
.../main/scala/kafka/server/ReplicaManager.scala | 6 ++--
.../server/checkpoints/OffsetCheckpointFile.scala | 23 +++++++++++---
.../unit/kafka/server/ReplicaManagerTest.scala | 20 ++++++------
.../checkpoints/OffsetCheckpointFileTest.scala | 36 ++++++++++++++++++++++
4 files changed, 67 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8b383c2..64b6bee 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,7 +29,7 @@ import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.server.checkpoints.{OffsetCheckpointFile, OffsetCheckpoints, SimpleOffsetCheckpoints}
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.ElectionType
@@ -604,7 +604,7 @@ class ReplicaManager(val config: KafkaConfig,
// start ReplicaAlterDirThread to move data of this partition from the current log to the future log
// - Otherwise, return KafkaStorageException. We do not create the future log while there is offline log directory
// so that we can avoid creating future log for the same partition in multiple log directories.
- val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+ val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
val futureReplica = futureLocalReplicaOrException(topicPartition)
logManager.abortAndPauseCleaning(topicPartition)
@@ -1109,7 +1109,7 @@ class ReplicaManager(val config: KafkaConfig,
}
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
- val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+ val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
highWatermarkCheckpoints)
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 715f42f..69e62d2 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -66,13 +66,26 @@ trait OffsetCheckpoints {
def fetch(logDir: String, topicPartition: TopicPartition): Option[Long]
}
-class SimpleOffsetCheckpoints(checkpointFilesByLogDir: Map[String, OffsetCheckpointFile])
- extends OffsetCheckpoints {
+/**
+ * Loads checkpoint files on demand and caches the offsets for reuse.
+ */
+class LazyOffsetCheckpoints(checkpointsByLogDir: Map[String, OffsetCheckpointFile]) extends OffsetCheckpoints {
+ private val lazyCheckpointsByLogDir = checkpointsByLogDir.map { case (logDir, checkpointFile) =>
+ logDir -> new LazyOffsetCheckpointMap(checkpointFile)
+ }.toMap
override def fetch(logDir: String, topicPartition: TopicPartition): Option[Long] = {
- val checkpoint = checkpointFilesByLogDir(logDir)
- val offsetMap = checkpoint.read()
- offsetMap.get(topicPartition)
+ val offsetCheckpointFile = lazyCheckpointsByLogDir.getOrElse(logDir,
+ throw new IllegalArgumentException(s"No checkpoint file for log dir $logDir"))
+ offsetCheckpointFile.fetch(topicPartition)
+ }
+}
+
+class LazyOffsetCheckpointMap(checkpoint: OffsetCheckpointFile) {
+ private lazy val offsets: Map[TopicPartition, Long] = checkpoint.read()
+
+ def fetch(topicPartition: TopicPartition): Option[Long] = {
+ offsets.get(topicPartition)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 59248f0..64a29ed 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker
import kafka.cluster.BrokerEndPoint
-import kafka.server.checkpoints.SimpleOffsetCheckpoints
+import kafka.server.checkpoints.LazyOffsetCheckpoints
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.timer.MockTimer
import kafka.zk.KafkaZkClient
@@ -87,7 +87,7 @@ class ReplicaManagerTest {
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
val partition = rm.createPartition(new TopicPartition(topic, 1))
- partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(1, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks()
} finally {
// shutdown the replica manager upon test completion
@@ -106,7 +106,7 @@ class ReplicaManagerTest {
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
val partition = rm.createPartition(new TopicPartition(topic, 1))
- partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(1, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks()
} finally {
// shutdown the replica manager upon test completion
@@ -160,7 +160,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) ->
@@ -204,7 +204,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -255,7 +255,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -351,7 +351,7 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -417,7 +417,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1, 2).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
+ partition.getOrCreateReplica(0, isNew = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -552,7 +552,7 @@ class ReplicaManagerTest {
// Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
- val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+ val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
replicaManager.createPartition(tp1).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
@@ -645,7 +645,7 @@ class ReplicaManagerTest {
// Initialize partition state to follower, with leader = 1, leaderEpoch = 1
val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition))
- val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+ val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
partition.getOrCreateReplica(followerBrokerId, isNew = false, offsetCheckpoints)
partition.makeFollower(controllerId,
leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
index 2d20674..99a40b6 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -22,6 +22,8 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.junit.Assert._
import org.junit.Test
+import org.mockito.Mockito
+import org.scalatest.Assertions.assertThrows
import scala.collection.Map
@@ -98,4 +100,38 @@ class OffsetCheckpointFileTest extends Logging {
new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()
}
+ @Test
+ def testLazyOffsetCheckpoint(): Unit = {
+ val logDir = "/tmp/kafka-logs"
+ val mockCheckpointFile = Mockito.mock(classOf[OffsetCheckpointFile])
+
+ val lazyCheckpoints = new LazyOffsetCheckpoints(Map(logDir -> mockCheckpointFile))
+ Mockito.verify(mockCheckpointFile, Mockito.never()).read()
+
+ val partition0 = new TopicPartition("foo", 0)
+ val partition1 = new TopicPartition("foo", 1)
+ val partition2 = new TopicPartition("foo", 2)
+
+ Mockito.when(mockCheckpointFile.read()).thenReturn(Map(
+ partition0 -> 1000L,
+ partition1 -> 2000L
+ ))
+
+ assertEquals(Some(1000L), lazyCheckpoints.fetch(logDir, partition0))
+ assertEquals(Some(2000L), lazyCheckpoints.fetch(logDir, partition1))
+ assertEquals(None, lazyCheckpoints.fetch(logDir, partition2))
+
+ Mockito.verify(mockCheckpointFile, Mockito.times(1)).read()
+ }
+
+ @Test
+ def testLazyOffsetCheckpointFileInvalidLogDir(): Unit = {
+ val logDir = "/tmp/kafka-logs"
+ val mockCheckpointFile = Mockito.mock(classOf[OffsetCheckpointFile])
+ val lazyCheckpoints = new LazyOffsetCheckpoints(Map(logDir -> mockCheckpointFile))
+ assertThrows[IllegalArgumentException] {
+ lazyCheckpoints.fetch("/invalid/kafka-logs", new TopicPartition("foo", 0))
+ }
+ }
+
}