You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:01 UTC

[22/37] git commit: KAFKA-769 On startup, a brokers highwatermark for every topic partition gets reset to zero; reviewed by Neha Narkhede

KAFKA-769 On startup, a brokers highwatermark for every topic partition gets reset to zero; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/144a0a2a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/144a0a2a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/144a0a2a

Branch: refs/heads/trunk
Commit: 144a0a2ac02ecf6297f4dee4ae773be59095b1e7
Parents: 0be45b3
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Fri Feb 22 15:26:03 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 22 15:26:13 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/server/ReplicaManager.scala   |    9 +++++++--
 1 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/144a0a2a/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1044085..4e6c8ea 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -49,6 +49,7 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
+  private var hwThreadInitialized = false
 
   newGauge(
     "LeaderCount",
@@ -92,8 +93,6 @@ class ReplicaManager(val config: KafkaConfig,
   def startup() {
     // start ISR expiration thread
     kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs)
-    // start high watermark checkpoint thread
-    startHighWaterMarksCheckPointThread()
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
@@ -209,6 +208,12 @@ class ReplicaManager(val config: KafkaConfig,
         responseMap.put(topicAndPartition, errorCode)
       }
       info("Completed leader and isr request %s".format(leaderAndISRRequest))
+      // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
+      // have been completely populated before starting the checkpointing there by avoiding weird race conditions
+      if (!hwThreadInitialized) {
+        startHighWaterMarksCheckPointThread()
+        hwThreadInitialized = true
+      }
       replicaFetcherManager.shutdownIdleFetcherThreads()
       (responseMap, ErrorMapping.NoError)
     }