You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/07 00:32:28 UTC
kafka git commit: KAFKA-4458;
add per partition in-sync and assigned replica count (KIP-96)
Repository: kafka
Updated Branches:
refs/heads/trunk b6af35161 -> 9e72c12e9
KAFKA-4458; add per partition in-sync and assigned replica count (KIP-96)
Author: Xavier L�aut� <xa...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2186 from xvrl/per-partition-replica-metrics
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e72c12e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e72c12e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e72c12e
Branch: refs/heads/trunk
Commit: 9e72c12e9811811997b6ca99a6dc56a9af46e43d
Parents: b6af351
Author: Xavier Le\u0301aute\u0301 <xa...@confluent.io>
Authored: Wed Dec 7 00:28:56 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Dec 7 00:31:49 2016 +0000
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 32 +++++++++++++++-----
1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e72c12e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c8db015..7e52a91 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -75,14 +75,28 @@ class Partition(val topic: String,
tags
)
- def isUnderReplicated(): Boolean = {
- leaderReplicaIfLocal() match {
- case Some(_) =>
- inSyncReplicas.size < assignedReplicas.size
- case None =>
- false
- }
- }
+ newGauge("InSyncReplicasCount",
+ new Gauge[Int] {
+ def value = {
+ if (isLeaderReplicaLocal) inSyncReplicas.size else 0
+ }
+ },
+ tags
+ )
+
+ newGauge("ReplicasCount",
+ new Gauge[Int] {
+ def value = {
+ if (isLeaderReplicaLocal) assignedReplicas.size else 0
+ }
+ },
+ tags
+ )
+
+ private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined
+
+ def isUnderReplicated: Boolean =
+ isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
val replicaOpt = getReplica(replicaId)
@@ -480,6 +494,8 @@ class Partition(val topic: String,
*/
private def removePartitionMetrics() {
removeMetric("UnderReplicated", tags)
+ removeMetric("InSyncReplicasCount", tags)
+ removeMetric("ReplicasCount", tags)
}
override def equals(that: Any): Boolean = {