You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/07/25 01:48:46 UTC
git commit: KAFKA-1483 Split Brain about Leader Partitions;
reviewed by Guozhang, Jun and Neha
Repository: kafka
Updated Branches:
refs/heads/trunk db41f98ea -> d9e5080df
KAFKA-1483 Split Brain about Leader Partitions; reviewed by Guozhang, Jun and Neha
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9e5080d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9e5080d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9e5080d
Branch: refs/heads/trunk
Commit: d9e5080dfceff12ea599f8d518ad402ba7d54c9d
Parents: db41f98
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Thu Jul 24 16:48:21 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Jul 24 16:48:41 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/server/ReplicaManager.scala | 35 +++++---------------
1 file changed, 9 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9e5080d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6a56a77..897783c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -36,9 +36,9 @@ object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
}
-class ReplicaManager(val config: KafkaConfig,
- time: Time,
- val zkClient: ZkClient,
+class ReplicaManager(val config: KafkaConfig,
+ time: Time,
+ val zkClient: ZkClient,
scheduler: Scheduler,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
@@ -46,8 +46,6 @@ class ReplicaManager(val config: KafkaConfig,
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
private val localBrokerId = config.brokerId
private val allPartitions = new Pool[(String, Int), Partition]
- private var leaderPartitions = new mutable.HashSet[Partition]()
- private val leaderPartitionsLock = new Object
private val replicaStateChangeLock = new Object
val replicaFetcherManager = new ReplicaFetcherManager(config, this)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
@@ -60,9 +58,7 @@ class ReplicaManager(val config: KafkaConfig,
"LeaderCount",
new Gauge[Int] {
def value = {
- leaderPartitionsLock synchronized {
- leaderPartitions.size
- }
+ getLeaderPartitions().size
}
}
)
@@ -82,9 +78,7 @@ class ReplicaManager(val config: KafkaConfig,
val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
def underReplicatedPartitionCount(): Int = {
- leaderPartitionsLock synchronized {
- leaderPartitions.count(_.isUnderReplicated)
- }
+ getLeaderPartitions().count(_.isUnderReplicated)
}
def startHighWaterMarksCheckPointThread() = {
@@ -117,9 +111,6 @@ class ReplicaManager(val config: KafkaConfig,
val errorCode = ErrorMapping.NoError
getPartition(topic, partitionId) match {
case Some(partition) =>
- leaderPartitionsLock synchronized {
- leaderPartitions -= partition
- }
if(deletePartition) {
val removedPartition = allPartitions.remove((topic, partitionId))
if (removedPartition != null)
@@ -331,10 +322,6 @@ class ReplicaManager(val config: KafkaConfig,
partitionState.foreach{ case (partition, partitionStateInfo) =>
partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
- // Finally add these partitions to the list of partitions for which the leader is the current broker
- leaderPartitionsLock synchronized {
- leaderPartitions ++= partitionState.keySet
- }
} catch {
case e: Throwable =>
partitionState.foreach { state =>
@@ -383,9 +370,6 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
try {
- leaderPartitionsLock synchronized {
- leaderPartitions --= partitionState.keySet
- }
var partitionsToMakeFollower: Set[Partition] = Set()
@@ -464,11 +448,7 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
- var curLeaderPartitions: List[Partition] = null
- leaderPartitionsLock synchronized {
- curLeaderPartitions = leaderPartitions.toList
- }
- curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
+ allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
}
def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
@@ -480,6 +460,9 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ private def getLeaderPartitions() : List[Partition] = {
+ allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList
+ }
/**
* Flushes the highwatermark value for all partitions to the highwatermark file
*/