You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:01 UTC
[22/37] git commit: KAFKA-769 On startup, a brokers highwatermark for
every topic partition gets reset to zero; reviewed by Neha Narkhede
KAFKA-769 On startup, a brokers highwatermark for every topic partition gets reset to zero; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/144a0a2a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/144a0a2a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/144a0a2a
Branch: refs/heads/trunk
Commit: 144a0a2ac02ecf6297f4dee4ae773be59095b1e7
Parents: 0be45b3
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Fri Feb 22 15:26:03 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 22 15:26:13 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/server/ReplicaManager.scala | 9 +++++++--
1 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/144a0a2a/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1044085..4e6c8ea 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -49,6 +49,7 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
+ private var hwThreadInitialized = false
newGauge(
"LeaderCount",
@@ -92,8 +93,6 @@ class ReplicaManager(val config: KafkaConfig,
def startup() {
// start ISR expiration thread
kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs)
- // start high watermark checkpoint thread
- startHighWaterMarksCheckPointThread()
}
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
@@ -209,6 +208,12 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put(topicAndPartition, errorCode)
}
info("Completed leader and isr request %s".format(leaderAndISRRequest))
+ // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
+ // have been completely populated before starting the checkpointing there by avoiding weird race conditions
+ if (!hwThreadInitialized) {
+ startHighWaterMarksCheckPointThread()
+ hwThreadInitialized = true
+ }
replicaFetcherManager.shutdownIdleFetcherThreads()
(responseMap, ErrorMapping.NoError)
}