You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/03/26 03:54:09 UTC
[kafka] branch trunk updated: MINOR: Improve performance of
checkpointHighWatermarks, patch 1/2 (#6741)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8cf781e MINOR: Improve performance of checkpointHighWatermarks, patch 1/2 (#6741)
8cf781e is described below
commit 8cf781ef0196995ef99d875612b6b7e9adcb912b
Author: Gardner Vickers <ga...@vickers.me>
AuthorDate: Wed Mar 25 20:53:42 2020 -0700
MINOR: Improve performance of checkpointHighWatermarks, patch 1/2 (#6741)
This PR works to improve high watermark checkpointing performance.
`ReplicaManager.checkpointHighWatermarks()` was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.
Added a JMH benchmark for `checkpointHighWatermarks` which establishes a
performance baseline. The parameterized benchmark was run with 100, 1000 and
2000 topics.
Modified `ReplicaManager.checkpointHighWatermarks()` to avoid extra copies and cached
the Log parent directory Sting to avoid frequent allocations when calculating
`File.getParent()`.
A few clean-ups:
* Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
Log.parentDirFile.
* Only expose public accessor for `Log.dir` (consistent with `Log.parentDir`)
* Removed unused parameters in `Partition.makeLeader`, `Partition.makeFollower` and `Partition.createLogIfNotExists`.
Benchmark results:
| Topic Count | Ops/ms | MB/sec allocated |
|-------------|---------|------------------|
| 100 | + 51% | - 91% |
| 1000 | + 143% | - 49% |
| 2000 | + 149% | - 50% |
Reviewers: Lucas Bradstreet <lu...@confluent.io>. Ismael Juma <is...@juma.me.uk>
Co-authored-by: Gardner Vickers <ga...@vickers.me>
Co-authored-by: Ismael Juma <is...@juma.me.uk>
---
.gitignore | 1 +
build.gradle | 6 +-
checkstyle/import-control-jmh-benchmarks.xml | 1 +
checkstyle/import-control.xml | 11 --
core/src/main/scala/kafka/cluster/Partition.scala | 38 ++---
core/src/main/scala/kafka/log/Log.scala | 63 +++++---
core/src/main/scala/kafka/log/LogCleaner.scala | 6 +-
.../main/scala/kafka/log/LogCleanerManager.scala | 6 +-
core/src/main/scala/kafka/log/LogManager.scala | 38 ++---
.../main/scala/kafka/server/ReplicaManager.scala | 47 +++---
.../unit/kafka/cluster/AssignmentStateTest.scala | 2 +-
.../unit/kafka/cluster/PartitionLockTest.scala | 12 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 110 +++++--------
.../unit/kafka/server/ReplicaManagerTest.scala | 45 +++---
gradle/spotbugs-exclude.xml | 3 +-
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +-
.../partition/PartitionMakeFollowerBenchmark.java | 4 +-
.../UpdateFollowerFetchStateBenchmark.java | 2 +-
.../jmh/server/HighwatermarkCheckpointBench.java | 174 +++++++++++++++++++++
19 files changed, 360 insertions(+), 211 deletions(-)
diff --git a/.gitignore b/.gitignore
index 4bdb94e..f17d9dd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,4 +54,5 @@ systest/
clients/src/generated
clients/src/generated-test
jmh-benchmarks/generated
+jmh-benchmarks/src/main/generated
streams/src/generated
diff --git a/build.gradle b/build.gradle
index aa8bf97..092f4ff 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1540,10 +1540,14 @@ project(':jmh-benchmarks') {
compile project(':core')
compile project(':clients')
compile project(':streams')
+ compile project(':core')
+ compile project(':clients').sourceSets.test.output
+ compile project(':core').sourceSets.test.output
compile libs.jmhCore
- compile libs.mockitoCore
annotationProcessor libs.jmhGeneratorAnnProcess
compile libs.jmhCoreBenchmarks
+ compile libs.mockitoCore
+ compile libs.slf4jlog4j
}
jar {
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index 3536ccb..4b546cb 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -39,6 +39,7 @@
<allow pkg="kafka.controller"/>
<allow pkg="kafka.coordinator"/>
<allow pkg="kafka.network"/>
+ <allow pkg="kafka.utils"/>
<allow pkg="kafka.zk"/>
<allow class="kafka.utils.Pool"/>
<allow class="kafka.utils.KafkaScheduler"/>
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c4a7662..3e41e97 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -284,17 +284,6 @@
</subpackage>
</subpackage>
- <subpackage name="jmh">
- <allow pkg="org.openjdk.jmh.annotations" />
- <allow pkg="org.openjdk.jmh.runner" />
- <allow pkg="org.openjdk.jmh.runner.options" />
- <allow pkg="org.openjdk.jmh.infra" />
- <allow pkg="org.apache.kafka.common" />
- <allow pkg="org.apache.kafka.clients" />
- <allow pkg="org.apache.kafka.streams" />
- <allow pkg="org.github.jamm" />
- </subpackage>
-
<subpackage name="log4jappender">
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka.clients" />
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e55f89d..0c2fccb 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -19,7 +19,7 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.{Optional, Properties}
-import kafka.api.{ApiVersion, LeaderAndIsr, Request}
+import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.KafkaController
import kafka.log._
@@ -266,7 +266,7 @@ class Partition(val topicPartition: TopicPartition,
// current replica and the existence of the future replica, no other thread can update the log directory of the
// current replica or remove the future replica.
inWriteLock(leaderIsrUpdateLock) {
- val currentLogDir = localLogOrException.dir.getParent
+ val currentLogDir = localLogOrException.parentDir
if (currentLogDir == logDir) {
info(s"Current log directory $currentLogDir is same as requested log dir $logDir. " +
s"Skipping future replica creation.")
@@ -274,34 +274,34 @@ class Partition(val topicPartition: TopicPartition,
} else {
futureLog match {
case Some(partitionFutureLog) =>
- val futureLogDir = partitionFutureLog.dir.getParent
+ val futureLogDir = partitionFutureLog.parentDir
if (futureLogDir != logDir)
throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " +
s"different from the requested log dir $logDir")
false
case None =>
- createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
+ createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
true
}
}
}
}
- def createLogIfNotExists(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
+ def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
isFutureReplica match {
case true if futureLog.isEmpty =>
- val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+ val log = createLog(isNew, isFutureReplica, offsetCheckpoints)
this.futureLog = Option(log)
case false if log.isEmpty =>
- val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+ val log = createLog(isNew, isFutureReplica, offsetCheckpoints)
this.log = Option(log)
case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
}
}
// Visible for testing
- private[cluster] def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
- val fetchLogConfig = () => {
+ private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
+ def fetchLogConfig: LogConfig = {
val props = stateStore.fetchTopicConfig()
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
}
@@ -309,8 +309,8 @@ class Partition(val topicPartition: TopicPartition,
logManager.initializingLog(topicPartition)
var maybeLog: Option[Log] = None
try {
- val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica)
- val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
+ val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig, isNew, isFutureReplica)
+ val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L
}
@@ -319,7 +319,7 @@ class Partition(val topicPartition: TopicPartition,
maybeLog = Some(log)
log
} finally {
- logManager.finishedInitializingLog(topicPartition, maybeLog, fetchLogConfig)
+ logManager.finishedInitializingLog(topicPartition, maybeLog, () => fetchLogConfig)
}
}
@@ -410,7 +410,7 @@ class Partition(val topicPartition: TopicPartition,
def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
inReadLock(leaderIsrUpdateLock) {
- futureLog.exists(_.dir.getParent != newDestinationDir)
+ futureLog.exists(_.parentDir != newDestinationDir)
}
}
@@ -478,9 +478,7 @@ class Partition(val topicPartition: TopicPartition,
* from the time when this broker was the leader last time) and setting the new leader and ISR.
* If the leader replica id does not change, return false to indicate the replica manager.
*/
- def makeLeader(controllerId: Int,
- partitionState: LeaderAndIsrPartitionState,
- correlationId: Int,
+ def makeLeader(partitionState: LeaderAndIsrPartitionState,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
@@ -493,7 +491,7 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
)
- createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
+ createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
val leaderLog = localLogOrException
val leaderEpochStartOffset = leaderLog.logEndOffset
@@ -549,9 +547,7 @@ class Partition(val topicPartition: TopicPartition,
* greater (that is, no updates have been missed), return false to indicate to the
* replica manager that state is already correct and the become-follower steps can be skipped
*/
- def makeFollower(controllerId: Int,
- partitionState: LeaderAndIsrPartitionState,
- correlationId: Int,
+ def makeFollower(partitionState: LeaderAndIsrPartitionState,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val newLeaderBrokerId = partitionState.leader
@@ -566,7 +562,7 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
)
- createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
+ createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = None
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c6bce27..20b0cd2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -188,7 +188,7 @@ object RollParams {
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
*
- * @param dir The directory in which log segments are created.
+ * @param _dir The directory in which log segments are created.
* @param config The log configuration settings
* @param logStartOffset The earliest offset allowed to be exposed to kafka client.
* The logStartOffset can be updated by :
@@ -209,7 +209,7 @@ object RollParams {
* @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
*/
@threadsafe
-class Log(@volatile var dir: File,
+class Log(@volatile private var _dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
@@ -228,36 +228,17 @@ class Log(@volatile var dir: File,
/* A lock that guards all modifications to the log */
private val lock = new Object
+
// The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
// After memory mapped buffer is closed, no disk IO operation should be performed for this log
@volatile private var isMemoryMappedBufferClosed = false
+ // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
+ @volatile private var _parentDir: String = dir.getParent
+
/* last time it was flushed */
private val lastFlushedTime = new AtomicLong(time.milliseconds)
- def initFileSize: Int = {
- if (config.preallocate)
- config.segmentSize
- else
- 0
- }
-
- def updateConfig(newConfig: LogConfig): Unit = {
- val oldConfig = this.config
- this.config = newConfig
- val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
- val newRecordVersion = newConfig.messageFormatVersion.recordVersion
- if (newRecordVersion.precedes(oldRecordVersion))
- warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
- if (newRecordVersion.value != oldRecordVersion.value)
- initializeLeaderEpochCache()
- }
-
- private def checkIfMemoryMappedBufferClosed(): Unit = {
- if (isMemoryMappedBufferClosed)
- throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
- }
-
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
/* The earliest offset which is part of an incomplete transaction. This is used to compute the
@@ -316,6 +297,35 @@ class Log(@volatile var dir: File,
s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
}
+ def dir: File = _dir
+
+ def parentDir: String = _parentDir
+
+ def parentDirFile: File = new File(_parentDir)
+
+ def initFileSize: Int = {
+ if (config.preallocate)
+ config.segmentSize
+ else
+ 0
+ }
+
+ def updateConfig(newConfig: LogConfig): Unit = {
+ val oldConfig = this.config
+ this.config = newConfig
+ val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
+ val newRecordVersion = newConfig.messageFormatVersion.recordVersion
+ if (newRecordVersion.precedes(oldRecordVersion))
+ warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
+ if (newRecordVersion.value != oldRecordVersion.value)
+ initializeLeaderEpochCache()
+ }
+
+ private def checkIfMemoryMappedBufferClosed(): Unit = {
+ if (isMemoryMappedBufferClosed)
+ throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
+ }
+
def highWatermark: Long = highWatermarkMetadata.messageOffset
/**
@@ -961,7 +971,8 @@ class Log(@volatile var dir: File,
val renamedDir = new File(dir.getParent, name)
Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
if (renamedDir != dir) {
- dir = renamedDir
+ _dir = renamedDir
+ _parentDir = renamedDir.getParent
logSegments.foreach(_.updateDir(renamedDir))
producerStateManager.logDir = dir
// re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 312483d..22207e0 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -315,7 +315,7 @@ class LogCleaner(initialConfig: CleanerConfig,
} catch {
case e: LogCleaningException =>
warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e)
- cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition)
+ cleanerManager.markPartitionUncleanable(e.log.parentDir, e.log.topicPartition)
false
}
@@ -365,11 +365,11 @@ class LogCleaner(initialConfig: CleanerConfig,
case _: LogCleaningAbortedException => // task can be aborted, let it go.
case _: KafkaStorageException => // partition is already offline. let it go.
case e: IOException =>
- val logDirectory = cleanable.log.dir.getParent
+ val logDirectory = cleanable.log.parentDir
val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
} finally {
- cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
+ cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.parentDirFile, endOffset)
}
}
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index fe22a61..ba007c7 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -182,7 +182,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
// update checkpoint for logs with invalid checkpointed offsets
if (offsetsToClean.forceUpdateCheckpoint)
- updateCheckpoints(log.dir.getParentFile(), Option(topicPartition, offsetsToClean.firstDirtyOffset))
+ updateCheckpoints(log.parentDirFile, Option(topicPartition, offsetsToClean.firstDirtyOffset))
val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
@@ -379,7 +379,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case Some(offset) =>
// Remove this partition from the checkpoint file in the source log directory
updateCheckpoints(sourceLogDir, None)
- // Add offset for this partition to the checkpoint file in the source log directory
+ // Add offset for this partition to the checkpoint file in the destination log directory
updateCheckpoints(destLogDir, Option(topicPartition, offset))
case None =>
}
@@ -478,7 +478,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
inLock(lock) {
- uncleanablePartitions.get(log.dir.getParent).exists(partitions => partitions.contains(topicPartition))
+ uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
}
}
}
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index d6b14b0..bf4351c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -199,7 +199,7 @@ class LogManager(logDirs: Seq[File],
cleaner.handleLogDirFailure(dir)
val offlineCurrentTopicPartitions = currentLogs.collect {
- case (tp, log) if log.dir.getParent == dir => tp
+ case (tp, log) if log.parentDir == dir => tp
}
offlineCurrentTopicPartitions.foreach { topicPartition => {
val removedLog = currentLogs.remove(topicPartition)
@@ -210,7 +210,7 @@ class LogManager(logDirs: Seq[File],
}}
val offlineFutureTopicPartitions = futureLogs.collect {
- case (tp, log) if log.dir.getParent == dir => tp
+ case (tp, log) if log.parentDir == dir => tp
}
offlineFutureTopicPartitions.foreach { topicPartition => {
val removedLog = futureLogs.remove(topicPartition)
@@ -282,7 +282,7 @@ class LogManager(logDirs: Seq[File],
}
if (previous != null) {
if (log.isFuture)
- throw new IllegalStateException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+ throw new IllegalStateException(s"Duplicate log directories found: ${log.dir.getAbsolutePath}, ${previous.dir.getAbsolutePath}")
else
throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " +
s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " +
@@ -514,7 +514,7 @@ class LogManager(logDirs: Seq[File],
if (log.truncateTo(truncateOffset))
affectedLogs += log
if (needToStopCleaner && !isFuture)
- cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+ cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
} finally {
if (needToStopCleaner && !isFuture) {
cleaner.resumeCleaning(Seq(topicPartition))
@@ -524,7 +524,7 @@ class LogManager(logDirs: Seq[File],
}
}
- for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) {
+ for ((dir, logs) <- affectedLogs.groupBy(_.parentDirFile)) {
checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs)
}
}
@@ -551,7 +551,7 @@ class LogManager(logDirs: Seq[File],
try {
log.truncateFullyAndStartAt(newOffset)
if (cleaner != null && !isFuture) {
- cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+ cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
}
} finally {
if (cleaner != null && !isFuture) {
@@ -559,7 +559,7 @@ class LogManager(logDirs: Seq[File],
info(s"Compaction for partition $topicPartition is resumed")
}
}
- checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log))
+ checkpointRecoveryOffsetsAndCleanSnapshot(log.parentDirFile, Seq(log))
}
}
@@ -633,8 +633,8 @@ class LogManager(logDirs: Seq[File],
// The logDir should be an absolute path
def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
// Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir
- if (!getLog(topicPartition).exists(_.dir.getParent == logDir) &&
- !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir))
+ if (!getLog(topicPartition).exists(_.parentDir == logDir) &&
+ !getLog(topicPartition, isFuture = true).exists(_.parentDir == logDir))
preferredLogDirs.put(topicPartition, logDir)
}
@@ -723,7 +723,7 @@ class LogManager(logDirs: Seq[File],
if (isFuture) {
if (preferredLogDir == null)
throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
- else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)
+ else if (getLog(topicPartition).get.parentDir == preferredLogDir)
throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
}
@@ -818,7 +818,7 @@ class LogManager(logDirs: Seq[File],
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: KafkaStorageException =>
- error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
+ error(s"Exception while deleting $removedLog in dir ${removedLog.parentDir}.", e)
}
}
}
@@ -866,7 +866,7 @@ class LogManager(logDirs: Seq[File],
futureLogs.remove(topicPartition)
currentLogs.put(topicPartition, destLog)
if (cleaner != null) {
- cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile)
+ cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile)
cleaner.resumeCleaning(Seq(topicPartition))
info(s"Compaction for partition $topicPartition is resumed")
}
@@ -876,8 +876,8 @@ class LogManager(logDirs: Seq[File],
// Now that replica in source log directory has been successfully renamed for deletion.
// Close the log, update checkpoint files, and enqueue this log to be deleted.
sourceLog.close()
- checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.dir.getParentFile, ArrayBuffer.empty)
- checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
+ checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty)
+ checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
addLogToBeDeleted(sourceLog)
} catch {
case e: KafkaStorageException =>
@@ -911,11 +911,11 @@ class LogManager(logDirs: Seq[File],
//We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null && !isFuture) {
cleaner.abortCleaning(topicPartition)
- cleaner.updateCheckpoints(removedLog.dir.getParentFile)
+ cleaner.updateCheckpoints(removedLog.parentDirFile)
}
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
- checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
- checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
+ checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty)
+ checkpointLogStartOffsetsInDir(removedLog.parentDirFile)
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
} else if (offlineLogDirs.nonEmpty) {
@@ -934,7 +934,7 @@ class LogManager(logDirs: Seq[File],
List(_liveLogDirs.peek())
} else {
// count the number of logs in each parent directory (including 0 for empty directories
- val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
+ val logCounts = allLogs.groupBy(_.parentDir).mapValues(_.size)
val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
val dirCounts = (zeros ++ logCounts).toBuffer
@@ -1005,7 +1005,7 @@ class LogManager(logDirs: Seq[File],
*/
private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
(this.currentLogs.toList ++ this.futureLogs.toList).toMap
- .groupBy { case (_, log) => log.dir.getParent }
+ .groupBy { case (_, log) => log.parentDir }
}
// logDir should be an absolute path
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 084ffa5..b50d9be 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -480,7 +480,7 @@ class ReplicaManager(val config: KafkaConfig,
}
def getLogDir(topicPartition: TopicPartition): Option[String] = {
- localLog(topicPartition).map(_.dir.getParent)
+ localLog(topicPartition).map(_.parentDir)
}
/**
@@ -661,7 +661,7 @@ class ReplicaManager(val config: KafkaConfig,
* are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented.
*/
def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = {
- val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent)
+ val logsByDir = logManager.allLogs.groupBy(log => log.parentDir)
config.logDirs.toSet.map { logDir: String =>
val absolutePath = new File(logDir).getAbsolutePath
@@ -1303,7 +1303,7 @@ class ReplicaManager(val config: KafkaConfig,
val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
// Add future replica to partition's map
- partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = true,
highWatermarkCheckpoints)
// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
@@ -1369,7 +1369,7 @@ class ReplicaManager(val config: KafkaConfig,
// Update the partition information to be the leader
partitionStates.foreach { case (partition, partitionState) =>
try {
- if (partition.makeLeader(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) {
+ if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) {
partitionsToMakeLeaders += partition
stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
@@ -1451,7 +1451,7 @@ class ReplicaManager(val config: KafkaConfig,
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(_) =>
- if (partition.makeFollower(controllerId, partitionState, correlationId, highWatermarkCheckpoints))
+ if (partition.makeFollower(partitionState, highWatermarkCheckpoints))
partitionsToMakeFollower += partition
else
stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
@@ -1468,7 +1468,7 @@ class ReplicaManager(val config: KafkaConfig,
s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
// Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647)
- partition.createLogIfNotExists(localBrokerId, isNew = partitionState.isNew, isFutureReplica = false,
+ partition.createLogIfNotExists(isNew = partitionState.isNew, isFutureReplica = false,
highWatermarkCheckpoints)
}
} catch {
@@ -1600,20 +1600,25 @@ class ReplicaManager(val config: KafkaConfig,
// Flushes the highwatermark value for all partitions to the highwatermark file
def checkpointHighWatermarks(): Unit = {
- val localLogs = nonOfflinePartitionsIterator.flatMap { partition =>
- val logsList: mutable.Set[Log] = mutable.Set()
- partition.log.foreach(logsList.add)
- partition.futureLog.foreach(logsList.add)
- logsList
- }.toBuffer
- val logsByDir = localLogs.groupBy(_.dir.getParent)
- for ((dir, logs) <- logsByDir) {
- val hwms = logs.map(log => log.topicPartition -> log.highWatermark).toMap
- try {
- highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
- } catch {
+ def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]],
+ log: Log): Unit = {
+ val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir,
+ new mutable.AnyRefMap[TopicPartition, Long]())
+ checkpoints.put(log.topicPartition, log.highWatermark)
+ }
+
+ val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]](
+ allPartitions.size)
+ nonOfflinePartitionsIterator.foreach { partition =>
+ partition.log.foreach(putHw(logDirToHws, _))
+ partition.futureLog.foreach(putHw(logDirToHws, _))
+ }
+
+ for ((logDir, hws) <- logDirToHws) {
+ try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws))
+ catch {
case e: KafkaStorageException =>
- error(s"Error while writing to highwatermark file in directory $dir", e)
+ error(s"Error while writing to highwatermark file in directory $logDir", e)
}
}
}
@@ -1632,11 +1637,11 @@ class ReplicaManager(val config: KafkaConfig,
warn(s"Stopping serving replicas in dir $dir")
replicaStateChangeLock synchronized {
val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition =>
- partition.log.exists { _.dir.getParent == dir }
+ partition.log.exists { _.parentDir == dir }
}.map(_.topicPartition).toSet
val partitionsWithOfflineFutureReplica = nonOfflinePartitionsIterator.filter { partition =>
- partition.futureLog.exists { _.dir.getParent == dir }
+ partition.futureLog.exists { _.parentDir == dir }
}.toSet
replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
diff --git a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
index 08dd95e..b218bf7 100644
--- a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
@@ -112,7 +112,7 @@ class AssignmentStateTest(isr: List[Integer], replicas: List[Integer],
if (original.nonEmpty)
partition.assignmentState = SimpleAssignmentState(original)
// do the test
- partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+ partition.makeLeader(leaderState, offsetCheckpoints)
assertEquals(isReassigning, partition.isReassigning)
if (adding.nonEmpty)
adding.foreach(r => assertTrue(partition.isAddingReplica(r)))
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index cb1fd80..c8a7d33 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -222,8 +222,8 @@ class PartitionLockTest extends Logging {
}
}
- override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
- val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+ override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
+ val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints)
new SlowLog(log, mockTime, appendSemaphore)
}
}
@@ -235,21 +235,21 @@ class PartitionLockTest extends Logging {
when(stateStore.expandIsr(ArgumentMatchers.anyInt, ArgumentMatchers.any[LeaderAndIsr]))
.thenReturn(Some(2))
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val controllerId = 0
val controllerEpoch = 0
val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava
val isr = replicas
- assertTrue("Expected become leader transition to succeed", partition.makeLeader(controllerId, new LeaderAndIsrPartitionState()
+ assertTrue("Expected become leader transition to succeed", partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(1)
.setReplicas(replicas)
- .setIsNew(true), 0, offsetCheckpoints))
+ .setIsNew(true), offsetCheckpoints))
partition
}
@@ -310,4 +310,4 @@ class PartitionLockTest extends Logging {
appendInfo
}
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7967428..62ec28b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -109,7 +109,7 @@ class PartitionTest extends AbstractPartitionTest {
val latch = new CountDownLatch(1)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
- partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
@@ -153,13 +153,13 @@ class PartitionTest extends AbstractPartitionTest {
metadataCache,
logManager) {
- override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
- val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+ override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
+ val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints)
new SlowLog(log, mockTime, appendSemaphore)
}
}
- partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
val appendThread = new Thread {
override def run(): Unit = {
@@ -180,7 +180,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false)
- assertTrue(partition.makeFollower(0, partitionState, 0, offsetCheckpoints))
+ assertTrue(partition.makeFollower(partitionState, offsetCheckpoints))
appendSemaphore.release()
appendThread.join()
@@ -194,7 +194,7 @@ class PartitionTest extends AbstractPartitionTest {
// active segment
def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
- partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
@@ -465,7 +465,6 @@ class PartitionTest extends AbstractPartitionTest {
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
- val controllerId = brokerId + 3
val replicas = List(leader, follower1, follower2)
val isr = List[Integer](leader, follower2).asJava
val leaderEpoch = 8
@@ -486,7 +485,7 @@ class PartitionTest extends AbstractPartitionTest {
.setIsNew(true)
assertTrue("Expected first makeLeader() to return 'leader changed'",
- partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
+ partition.makeLeader(leaderState, offsetCheckpoints))
assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
@@ -561,7 +560,7 @@ class PartitionTest extends AbstractPartitionTest {
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
- assertTrue(partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints))
+ assertTrue(partition.makeFollower(followerState, offsetCheckpoints))
// Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition
val newLeaderState = new LeaderAndIsrPartitionState()
@@ -573,7 +572,7 @@ class PartitionTest extends AbstractPartitionTest {
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
- assertTrue(partition.makeLeader(controllerId, newLeaderState, 2, offsetCheckpoints))
+ assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints))
// Try to get offsets as a client
fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
@@ -636,34 +635,33 @@ class PartitionTest extends AbstractPartitionTest {
private def setupPartitionWithMocks(leaderEpoch: Int,
isLeader: Boolean,
log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
- val controllerId = 0
val controllerEpoch = 0
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicas
if (isLeader) {
assertTrue("Expected become leader transition to succeed",
- partition.makeLeader(controllerId, new LeaderAndIsrPartitionState()
+ partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(1)
.setReplicas(replicas)
- .setIsNew(true), 0, offsetCheckpoints))
+ .setIsNew(true), offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch)
} else {
assertTrue("Expected become follower transition to succeed",
- partition.makeFollower(controllerId, new LeaderAndIsrPartitionState()
+ partition.makeFollower(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId + 1)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(1)
.setReplicas(replicas)
- .setIsNew(true), 0, offsetCheckpoints))
+ .setIsNew(true), offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(None, partition.leaderLogIfLocal)
}
@@ -673,7 +671,7 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val log = partition.localLogOrException
val initialLogStartOffset = 5L
@@ -723,7 +721,6 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def testListOffsetIsolationLevels(): Unit = {
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val replicas = List[Integer](brokerId, brokerId + 1).asJava
@@ -731,17 +728,17 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch()
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
- partition.makeLeader(controllerId, new LeaderAndIsrPartitionState()
+ partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(1)
.setReplicas(replicas)
- .setIsNew(true), 0, offsetCheckpoints))
+ .setIsNew(true), offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch)
val records = createTransactionalRecords(List(
@@ -811,7 +808,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false)
- partition.makeFollower(0, partitionState, 0, offsetCheckpoints)
+ partition.makeFollower(partitionState, offsetCheckpoints)
// Request with same leader and epoch increases by only 1, do become-follower steps
partitionState = new LeaderAndIsrPartitionState()
@@ -822,7 +819,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false)
- assertTrue(partition.makeFollower(0, partitionState, 2, offsetCheckpoints))
+ assertTrue(partition.makeFollower(partitionState, offsetCheckpoints))
// Request with same leader and same epoch, skip become-follower steps
partitionState = new LeaderAndIsrPartitionState()
@@ -832,7 +829,7 @@ class PartitionTest extends AbstractPartitionTest {
.setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
- assertFalse(partition.makeFollower(0, partitionState, 2, offsetCheckpoints))
+ assertFalse(partition.makeFollower(partitionState, offsetCheckpoints))
}
@Test
@@ -841,7 +838,6 @@ class PartitionTest extends AbstractPartitionTest {
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
- val controllerId = brokerId + 3
val replicas = List[Integer](leader, follower1, follower2).asJava
val isr = List[Integer](leader, follower2).asJava
val leaderEpoch = 8
@@ -862,7 +858,7 @@ class PartitionTest extends AbstractPartitionTest {
.setReplicas(replicas)
.setIsNew(true)
assertTrue("Expected first makeLeader() to return 'leader changed'",
- partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
+ partition.makeLeader(leaderState, offsetCheckpoints))
assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
@@ -898,7 +894,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(false)
- partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints)
+ partition.makeFollower(followerState, offsetCheckpoints)
val newLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
@@ -909,7 +905,7 @@ class PartitionTest extends AbstractPartitionTest {
.setReplicas(replicas)
.setIsNew(false)
assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()",
- partition.makeLeader(controllerEpoch, newLeaderState, 2, offsetCheckpoints))
+ partition.makeLeader(newLeaderState, offsetCheckpoints))
val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset
// append records with the latest leader epoch
@@ -937,7 +933,6 @@ class PartitionTest extends AbstractPartitionTest {
*/
@Test
def testDelayedFetchAfterAppendRecords(): Unit = {
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
@@ -978,7 +973,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicaIds)
.setIsNew(true)
- partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+ partition.makeLeader(leaderState, offsetCheckpoints)
partitions += partition
}
@@ -1035,8 +1030,7 @@ class PartitionTest extends AbstractPartitionTest {
}
def createTransactionalRecords(records: Iterable[SimpleRecord],
- baseOffset: Long,
- partitionLeaderEpoch: Int = 0): MemoryRecords = {
+ baseOffset: Long): MemoryRecords = {
val producerId = 1L
val producerEpoch = 0.toShort
val baseSequence = 0
@@ -1058,7 +1052,6 @@ class PartitionTest extends AbstractPartitionTest {
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
- val controllerId = brokerId + 3
val replicas = List[Integer](leader, follower1, follower2).asJava
val isr = List[Integer](leader).asJava
val leaderEpoch = 8
@@ -1073,7 +1066,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true)
- partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+ partition.makeLeader(leaderState, offsetCheckpoints)
assertTrue(partition.isAtMinIsr)
}
@@ -1082,7 +1075,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 6, leaderEpoch = 4)
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
@@ -1091,12 +1083,11 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch()
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val initializeTimeMs = time.milliseconds()
assertTrue("Expected become leader transition to succeed",
partition.makeLeader(
- controllerId,
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
@@ -1105,7 +1096,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true),
- 0,
offsetCheckpoints))
val remoteReplica = partition.getReplica(remoteBrokerId).get
@@ -1146,7 +1136,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
@@ -1155,11 +1144,10 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch()
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
"Expected become leader transition to succeed",
partition.makeLeader(
- controllerId,
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
@@ -1168,7 +1156,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
- 0,
offsetCheckpoints)
)
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@@ -1213,7 +1200,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
@@ -1222,10 +1208,9 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch()
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
partition.makeLeader(
- controllerId,
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
@@ -1234,7 +1219,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true),
- 0,
offsetCheckpoints))
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@@ -1268,7 +1252,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
@@ -1278,11 +1261,10 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds()
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
"Expected become leader transition to succeed",
partition.makeLeader(
- controllerId,
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
@@ -1291,7 +1273,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
- 0,
offsetCheckpoints)
)
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@@ -1325,7 +1306,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
@@ -1335,11 +1315,10 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds()
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
"Expected become leader transition to succeed",
partition.makeLeader(
- controllerId,
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
@@ -1348,7 +1327,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
- 0,
offsetCheckpoints)
)
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@@ -1399,7 +1377,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
@@ -1409,11 +1386,10 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds()
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
"Expected become leader transition to succeed",
partition.makeLeader(
- controllerId,
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
@@ -1422,7 +1398,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
- 0,
offsetCheckpoints)
)
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@@ -1458,7 +1433,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
- val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
@@ -1468,10 +1442,9 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds()
- partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
partition.makeLeader(
- controllerId,
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
@@ -1480,7 +1453,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true),
- 0,
offsetCheckpoints))
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1513,7 +1485,6 @@ class PartitionTest extends AbstractPartitionTest {
when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition))
.thenReturn(Some(4L))
- val controllerId = 0
val controllerEpoch = 3
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val leaderState = new LeaderAndIsrPartitionState()
@@ -1524,7 +1495,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(false)
- partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+ partition.makeLeader(leaderState, offsetCheckpoints)
assertEquals(4, partition.localLogOrException.highWatermark)
}
@@ -1553,7 +1524,6 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def testUnderReplicatedPartitionsCorrectSemantics(): Unit = {
- val controllerId = 0
val controllerEpoch = 3
val replicas = List[Integer](brokerId, brokerId + 1, brokerId + 2).asJava
val isr = List[Integer](brokerId, brokerId + 1).asJava
@@ -1566,11 +1536,11 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(false)
- partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+ partition.makeLeader(leaderState, offsetCheckpoints)
assertTrue(partition.isUnderReplicated)
leaderState = leaderState.setIsr(replicas)
- partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+ partition.makeLeader(leaderState, offsetCheckpoints)
assertFalse(partition.isUnderReplicated)
}
@@ -1626,7 +1596,7 @@ class PartitionTest extends AbstractPartitionTest {
metadataCache,
spyLogManager)
- partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+ partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@@ -1660,7 +1630,7 @@ class PartitionTest extends AbstractPartitionTest {
metadataCache,
spyLogManager)
- partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+ partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@@ -1695,7 +1665,7 @@ class PartitionTest extends AbstractPartitionTest {
metadataCache,
spyLogManager)
- partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+ partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e20ed10..4b60020 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -89,7 +89,7 @@ class ReplicaManagerTest {
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
val partition = rm.createPartition(new TopicPartition(topic, 1))
- partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks()
} finally {
@@ -109,7 +109,7 @@ class ReplicaManagerTest {
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
val partition = rm.createPartition(new TopicPartition(topic, 1))
- partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks()
} finally {
@@ -164,7 +164,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0))
- partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -216,7 +216,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
replicaManager.createPartition(topicPartition)
- .createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+ .createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -270,7 +270,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
- partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
@@ -330,7 +330,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
- partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
@@ -436,7 +436,7 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
- partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
@@ -512,7 +512,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1, 2).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0))
- partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader.
@@ -668,8 +668,8 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
- replicaManager.createPartition(tp1).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
+ replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](0, 2).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -782,10 +782,10 @@ class ReplicaManagerTest {
val tp = new TopicPartition(topic, topicPartition)
val partition = replicaManager.createPartition(tp)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- partition.createLogIfNotExists(followerBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
- partition.makeFollower(controllerId,
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.makeFollower(
leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
- correlationId, offsetCheckpoints)
+ offsetCheckpoints)
// Make local partition a follower - because epoch increased by more than 1, truncation should
// trigger even though leader does not change
@@ -808,7 +808,6 @@ class ReplicaManagerTest {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
- val controllerId = 0
val leaderEpoch = 1
val leaderEpochIncrement = 2
val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
@@ -823,11 +822,9 @@ class ReplicaManagerTest {
val partition = replicaManager.createPartition(tp)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- partition.createLogIfNotExists(leaderBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
partition.makeLeader(
- controllerId,
leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
- correlationId,
offsetCheckpoints
)
@@ -977,7 +974,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@@ -1016,7 +1013,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1064,7 +1061,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1112,7 +1109,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1154,7 +1151,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1193,7 +1190,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
- replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1293,7 +1290,7 @@ class ReplicaManagerTest {
val mockBrokerTopicStats = new BrokerTopicStats
val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
val mockLog = new Log(
- dir = new File(new File(config.logDirs.head), s"$topic-0"),
+ _dir = new File(new File(config.logDirs.head), s"$topic-0"),
config = LogConfig(),
logStartOffset = 0L,
recoveryPoint = 0L,
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index f3ca317..a717400 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -212,10 +212,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Package name="org.apache.kafka.jmh.cache.generated"/>
<Package name="org.apache.kafka.jmh.common.generated"/>
<Package name="org.apache.kafka.jmh.record.generated"/>
- <Package name="org.apache.kafka.jmh.producer.generated"/>
<Package name="org.apache.kafka.jmh.partition.generated"/>
+ <Package name="org.apache.kafka.jmh.producer.generated"/>
<Package name="org.apache.kafka.jmh.fetchsession.generated"/>
<Package name="org.apache.kafka.jmh.fetcher.generated"/>
+ <Package name="org.apache.kafka.jmh.server.generated"/>
</Or>
</Match>
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index a03c9bf..017926c 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -159,7 +159,7 @@ public class ReplicaFetcherThreadBenchmark {
0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager);
- partition.makeFollower(0, partitionState, 0, offsetCheckpoints);
+ partition.makeFollower(partitionState, offsetCheckpoints);
pool.put(tp, partition);
offsetAndEpochs.put(tp, new OffsetAndEpoch(0, 0));
BaseRecords fetched = new BaseRecords() {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 2919eba..d78f3e6 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -121,7 +121,7 @@ public class PartitionMakeFollowerBenchmark {
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
partitionStateStore, delayedOperations,
Mockito.mock(MetadataCache.class), logManager);
- partition.createLogIfNotExists(0, true, false, offsetCheckpoints);
+ partition.createLogIfNotExists(true, false, offsetCheckpoints);
executorService.submit((Runnable) () -> {
SimpleRecord[] simpleRecords = new SimpleRecord[] {
new SimpleRecord(1L, "foo".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8)),
@@ -154,7 +154,7 @@ public class PartitionMakeFollowerBenchmark {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true);
- return partition.makeFollower(0, partitionState, 0, offsetCheckpoints);
+ return partition.makeFollower(partitionState, offsetCheckpoints);
}
private static LogConfig createLogConfig() {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index c3e0746..a686c3b 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -119,7 +119,7 @@ public class UpdateFollowerFetchStateBenchmark {
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
partitionStateStore, delayedOperations,
Mockito.mock(MetadataCache.class), logManager);
- partition.makeLeader(0, partitionState, 0, offsetCheckpoints);
+ partition.makeLeader(partitionState, offsetCheckpoints);
}
// avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java
new file mode 100644
index 0000000..9cb8ac6
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.server;
+
+import java.util.Properties;
+import kafka.cluster.Partition;
+import kafka.cluster.PartitionStateStore;
+import kafka.log.CleanerConfig;
+import kafka.log.LogConfig;
+import kafka.log.LogManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.LogDirFailureChannel;
+import kafka.server.MetadataCache;
+import kafka.server.QuotaFactory;
+import kafka.server.ReplicaManager;
+import kafka.server.checkpoints.OffsetCheckpoints;
+import kafka.utils.KafkaScheduler;
+import kafka.utils.MockTime;
+import kafka.utils.Scheduler;
+import kafka.utils.TestUtils;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import scala.Option;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import scala.jdk.CollectionConverters;
+
+
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(3)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class HighwatermarkCheckpointBench {
+
+ @Param({"100", "1000", "2000"})
+ public int numTopics;
+
+ @Param({"3"})
+ public int numPartitions;
+
+ private final String topicName = "foo";
+
+ private Scheduler scheduler;
+
+ private Metrics metrics;
+
+ private MockTime time;
+
+ private KafkaConfig brokerProperties;
+
+ private ReplicaManager replicaManager;
+ private QuotaFactory.QuotaManagers quotaManagers;
+ private LogDirFailureChannel failureChannel;
+ private LogManager logManager;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
+ this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
+ 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(),
+ Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, (short) 1));
+ this.metrics = new Metrics();
+ this.time = new MockTime();
+ this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
+ final List<File> files =
+ CollectionConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
+ this.logManager = TestUtils.createLogManager(CollectionConverters.asScalaBuffer(files),
+ LogConfig.apply(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
+ 1024 * 1024, 32 * 1024 * 1024,
+ Double.MAX_VALUE, 15 * 1000, true, "MD5"), time);
+ scheduler.startup();
+ final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ final MetadataCache metadataCache =
+ new MetadataCache(this.brokerProperties.brokerId());
+ this.quotaManagers =
+ QuotaFactory.instantiate(this.brokerProperties,
+ this.metrics,
+ this.time, "");
+ KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
+ @Override
+ public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
+ return new Properties();
+ }
+ };
+ this.replicaManager = new ReplicaManager(
+ this.brokerProperties,
+ this.metrics,
+ this.time,
+ zkClient,
+ this.scheduler,
+ this.logManager,
+ new AtomicBoolean(false),
+ this.quotaManagers,
+ brokerTopicStats,
+ metadataCache,
+ this.failureChannel,
+ Option.empty());
+ replicaManager.startup();
+
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+ for (int topicNum = 0; topicNum < numTopics; topicNum++) {
+ final String topicName = this.topicName + "-" + topicNum;
+ for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) {
+ topicPartitions.add(new TopicPartition(topicName, partitionNum));
+ }
+ }
+
+ PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
+ Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
+ OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
+ for (TopicPartition topicPartition : topicPartitions) {
+ final Partition partition = this.replicaManager.createPartition(topicPartition);
+ partition.createLogIfNotExists(true, false, checkpoints);
+ }
+
+ replicaManager.checkpointHighWatermarks();
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() throws Exception {
+ this.replicaManager.shutdown(false);
+ this.metrics.close();
+ this.scheduler.shutdown();
+ this.quotaManagers.shutdown();
+ for (File dir : CollectionConverters.asJavaCollection(logManager.liveLogDirs())) {
+ Utils.delete(dir);
+ }
+ }
+
+
+ @Benchmark
+ @Threads(1)
+ public void measureCheckpointHighWatermarks() {
+ this.replicaManager.checkpointHighWatermarks();
+ }
+}