You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/12 19:21:51 UTC

git commit: KAFKA-800 inSyncReplica in Partition needs some tweaks; reviewed by Jun Rao and Neha Narkhede

Updated Branches:
  refs/heads/0.8 da1dc1788 -> 485afe646


KAFKA-800 inSyncReplica in Partition needs some tweaks; reviewed by Jun Rao and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/485afe64
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/485afe64
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/485afe64

Branch: refs/heads/0.8
Commit: 485afe646af282c59927b177ddc70742349cdad8
Parents: da1dc17
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Mar 12 11:21:30 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Mar 12 11:21:44 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Partition.scala  |   20 +++++++++------
 .../main/scala/kafka/server/ReplicaManager.scala   |    6 +++-
 2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/485afe64/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 824e394..6e73003 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -67,7 +67,9 @@ class Partition(val topic: String,
   )
 
   def isUnderReplicated(): Boolean = {
-    inSyncReplicas.size < replicationFactor
+    leaderIsrUpdateLock synchronized {
+      inSyncReplicas.size < replicationFactor
+    }
   }
 
   def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
@@ -339,12 +341,14 @@ class Partition(val topic: String,
   }
 
   override def toString(): String = {
-    val partitionString = new StringBuilder
-    partitionString.append("Topic: " + topic)
-    partitionString.append("; Partition: " + partitionId)
-    partitionString.append("; Leader: " + leaderReplicaIdOpt)
-    partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
-    partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
-    partitionString.toString()
+    leaderIsrUpdateLock synchronized {
+      val partitionString = new StringBuilder
+      partitionString.append("Topic: " + topic)
+      partitionString.append("; Partition: " + partitionId)
+      partitionString.append("; Leader: " + leaderReplicaIdOpt)
+      partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+      partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+      partitionString.toString()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/485afe64/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 53e34ec..68e712c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -57,7 +57,11 @@ class ReplicaManager(val config: KafkaConfig,
   newGauge(
     "LeaderCount",
     new Gauge[Int] {
-      def getValue = leaderPartitions.size
+      def getValue = {
+        leaderPartitionsLock synchronized {
+          leaderPartitions.size
+        }
+      }
     }
   )
   newGauge(