You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:24 UTC

[27/37] git commit: KAFKA-1483 Split Brain about Leader Partitions; reviewed by Guozhang, Jun and Neha

KAFKA-1483 Split Brain about Leader Partitions; reviewed by Guozhang, Jun and Neha


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9e5080d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9e5080d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9e5080d

Branch: refs/heads/transactional_messaging
Commit: d9e5080dfceff12ea599f8d518ad402ba7d54c9d
Parents: db41f98
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Thu Jul 24 16:48:21 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Jul 24 16:48:41 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/ReplicaManager.scala     | 35 +++++---------------
 1 file changed, 9 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9e5080d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6a56a77..897783c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -36,9 +36,9 @@ object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
 }
 
-class ReplicaManager(val config: KafkaConfig, 
-                     time: Time, 
-                     val zkClient: ZkClient, 
+class ReplicaManager(val config: KafkaConfig,
+                     time: Time,
+                     val zkClient: ZkClient,
                      scheduler: Scheduler,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
@@ -46,8 +46,6 @@ class ReplicaManager(val config: KafkaConfig,
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val localBrokerId = config.brokerId
   private val allPartitions = new Pool[(String, Int), Partition]
-  private var leaderPartitions = new mutable.HashSet[Partition]()
-  private val leaderPartitionsLock = new Object
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
@@ -60,9 +58,7 @@ class ReplicaManager(val config: KafkaConfig,
     "LeaderCount",
     new Gauge[Int] {
       def value = {
-        leaderPartitionsLock synchronized {
-          leaderPartitions.size
-        }
+          getLeaderPartitions().size
       }
     }
   )
@@ -82,9 +78,7 @@ class ReplicaManager(val config: KafkaConfig,
   val isrShrinkRate = newMeter("IsrShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
 
   def underReplicatedPartitionCount(): Int = {
-    leaderPartitionsLock synchronized {
-      leaderPartitions.count(_.isUnderReplicated)
-    }
+      getLeaderPartitions().count(_.isUnderReplicated)
   }
 
   def startHighWaterMarksCheckPointThread() = {
@@ -117,9 +111,6 @@ class ReplicaManager(val config: KafkaConfig,
     val errorCode = ErrorMapping.NoError
     getPartition(topic, partitionId) match {
       case Some(partition) =>
-        leaderPartitionsLock synchronized {
-          leaderPartitions -= partition
-        }
         if(deletePartition) {
           val removedPartition = allPartitions.remove((topic, partitionId))
           if (removedPartition != null)
@@ -331,10 +322,6 @@ class ReplicaManager(val config: KafkaConfig,
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
 
-      // Finally add these partitions to the list of partitions for which the leader is the current broker
-      leaderPartitionsLock synchronized {
-        leaderPartitions ++= partitionState.keySet
-      }
     } catch {
       case e: Throwable =>
         partitionState.foreach { state =>
@@ -383,9 +370,6 @@ class ReplicaManager(val config: KafkaConfig,
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
 
     try {
-      leaderPartitionsLock synchronized {
-        leaderPartitions --= partitionState.keySet
-      }
 
       var partitionsToMakeFollower: Set[Partition] = Set()
 
@@ -464,11 +448,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
-    var curLeaderPartitions: List[Partition] = null
-    leaderPartitionsLock synchronized {
-      curLeaderPartitions = leaderPartitions.toList
-    }
-    curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
+    allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
   }
 
   def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
@@ -480,6 +460,9 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private def getLeaderPartitions() : List[Partition] = {
+    allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList
+  }
   /**
    * Flushes the highwatermark value for all partitions to the highwatermark file
    */