You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/28 19:56:39 UTC
[kafka] branch trunk updated: MINOR: Remove redundant checkpoint
thread started field in ReplicaManager (#6813)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a3c92b MINOR: Remove redundant checkpoint thread started field in ReplicaManager (#6813)
4a3c92b is described below
commit 4a3c92bfc78044e1a325543e906ad38a2cfbb39c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 28 12:56:22 2019 -0700
MINOR: Remove redundant checkpoint thread started field in ReplicaManager (#6813)
We have two fields `highWatermarkCheckPointThreadStarted` and `hwThreadInitialized` which appear to be serving the same purpose. This patch gets rid of `hwThreadInitialized`.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
core/src/main/scala/kafka/server/ReplicaManager.scala | 11 +++--------
1 file changed, 3 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 54d35ef..55663d3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -201,7 +201,6 @@ class ReplicaManager(val config: KafkaConfig,
@volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
- private var hwThreadInitialized = false
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
@@ -265,8 +264,8 @@ class ReplicaManager(val config: KafkaConfig,
def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated)
- def startHighWaterMarksCheckPointThread() = {
- if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
+ def startHighWatermarkCheckPointThread() = {
+ if (highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
}
@@ -1136,13 +1135,9 @@ class ReplicaManager(val config: KafkaConfig,
markPartitionOffline(topicPartition)
}
-
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
- if (!hwThreadInitialized) {
- startHighWaterMarksCheckPointThread()
- hwThreadInitialized = true
- }
+ startHighWatermarkCheckPointThread()
val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
for (partition <- newPartitions) {