You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/07 00:32:28 UTC

kafka git commit: KAFKA-4458; add per partition in-sync and assigned replica count (KIP-96)

Repository: kafka
Updated Branches:
  refs/heads/trunk b6af35161 -> 9e72c12e9


KAFKA-4458; add per partition in-sync and assigned replica count (KIP-96)

Author: Xavier L�aut� <xa...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2186 from xvrl/per-partition-replica-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e72c12e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e72c12e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e72c12e

Branch: refs/heads/trunk
Commit: 9e72c12e9811811997b6ca99a6dc56a9af46e43d
Parents: b6af351
Author: Xavier Le\u0301aute\u0301 <xa...@confluent.io>
Authored: Wed Dec 7 00:28:56 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Dec 7 00:31:49 2016 +0000

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 32 +++++++++++++++-----
 1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e72c12e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c8db015..7e52a91 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -75,14 +75,28 @@ class Partition(val topic: String,
     tags
   )
 
-  def isUnderReplicated(): Boolean = {
-    leaderReplicaIfLocal() match {
-      case Some(_) =>
-        inSyncReplicas.size < assignedReplicas.size
-      case None =>
-        false
-    }
-  }
+  newGauge("InSyncReplicasCount",
+    new Gauge[Int] {
+      def value = {
+        if (isLeaderReplicaLocal) inSyncReplicas.size else 0
+      }
+    },
+    tags
+  )
+
+  newGauge("ReplicasCount",
+    new Gauge[Int] {
+      def value = {
+        if (isLeaderReplicaLocal) assignedReplicas.size else 0
+      }
+    },
+    tags
+  )
+
+  private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined
+
+  def isUnderReplicated: Boolean =
+    isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size
 
   def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
     val replicaOpt = getReplica(replicaId)
@@ -480,6 +494,8 @@ class Partition(val topic: String,
    */
   private def removePartitionMetrics() {
     removeMetric("UnderReplicated", tags)
+    removeMetric("InSyncReplicasCount", tags)
+    removeMetric("ReplicasCount", tags)
   }
 
   override def equals(that: Any): Boolean = {