You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/29 01:44:35 UTC
kafka git commit: KAFKA-1902;
fix MetricName so that Yammer reporter can work correctly; patched by Jun Rao;
reviewed by Manikumar Reddy, Manikumar Reddy and Joel Koshy
Repository: kafka
Updated Branches:
refs/heads/0.8.2 0b312a6b9 -> 2a1e3d451
KAFKA-1902; fix MetricName so that Yammer reporter can work correctly; patched by Jun Rao; reviewed by Manikumar Reddy, Manikumar Reddy and Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a1e3d45
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a1e3d45
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a1e3d45
Branch: refs/heads/0.8.2
Commit: 2a1e3d4510e8fadb0cad0cb7290baf54aae39c23
Parents: 0b312a6
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Jan 28 18:44:16 2015 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 28 18:44:16 2015 -0600
----------------------------------------------------------------------
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 27 ++++++++++++++++++--
1 file changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1e3d45/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index e9e4918..9e31184 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -61,9 +61,15 @@ trait KafkaMetricsGroup extends Logging {
nameBuilder.append(name)
}
- KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName))
+ val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null)
+ val tagsName = KafkaMetricsGroup.toMBeanName(tags)
+ tagsName match {
+ case Some(tn) =>
+ nameBuilder.append(",").append(tn)
+ case None =>
+ }
- new MetricName(group, typeName, name, null, nameBuilder.toString())
+ new MetricName(group, typeName, name, scope, nameBuilder.toString())
}
def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
@@ -160,6 +166,23 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
}
}
+ private def toScope(tags: collection.Map[String, String]): Option[String] = {
+ val filteredTags = tags
+ .filter { case (tagKey, tagValue) => tagValue != ""}
+ if (filteredTags.nonEmpty) {
+ // convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
+ val tagsString = filteredTags
+ .toList.sortWith((t1, t2) => t1._1 < t2._1)
+ .map { case (key, value) => "%s.%s".format(key, value.replaceAll("\\.", "_"))}
+ .mkString(".")
+
+ Some(tagsString)
+ }
+ else {
+ None
+ }
+ }
+
def removeAllConsumerMetrics(clientId: String) {
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)