You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/12 19:21:51 UTC
git commit: KAFKA-800 inSyncReplica in Partition needs some tweaks;
reviewed by Jun Rao and Neha Narkhede
Updated Branches:
refs/heads/0.8 da1dc1788 -> 485afe646
KAFKA-800 inSyncReplica in Partition needs some tweaks; reviewed by Jun Rao and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/485afe64
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/485afe64
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/485afe64
Branch: refs/heads/0.8
Commit: 485afe646af282c59927b177ddc70742349cdad8
Parents: da1dc17
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Mar 12 11:21:30 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Mar 12 11:21:44 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/cluster/Partition.scala | 20 +++++++++------
.../main/scala/kafka/server/ReplicaManager.scala | 6 +++-
2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/485afe64/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 824e394..6e73003 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -67,7 +67,9 @@ class Partition(val topic: String,
)
def isUnderReplicated(): Boolean = {
- inSyncReplicas.size < replicationFactor
+ leaderIsrUpdateLock synchronized {
+ inSyncReplicas.size < replicationFactor
+ }
}
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
@@ -339,12 +341,14 @@ class Partition(val topic: String,
}
override def toString(): String = {
- val partitionString = new StringBuilder
- partitionString.append("Topic: " + topic)
- partitionString.append("; Partition: " + partitionId)
- partitionString.append("; Leader: " + leaderReplicaIdOpt)
- partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
- partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
- partitionString.toString()
+ leaderIsrUpdateLock synchronized {
+ val partitionString = new StringBuilder
+ partitionString.append("Topic: " + topic)
+ partitionString.append("; Partition: " + partitionId)
+ partitionString.append("; Leader: " + leaderReplicaIdOpt)
+ partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+ partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+ partitionString.toString()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/485afe64/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 53e34ec..68e712c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -57,7 +57,11 @@ class ReplicaManager(val config: KafkaConfig,
newGauge(
"LeaderCount",
new Gauge[Int] {
- def getValue = leaderPartitions.size
+ def getValue = {
+ leaderPartitionsLock synchronized {
+ leaderPartitions.size
+ }
+ }
}
)
newGauge(