You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/29 01:44:35 UTC

kafka git commit: KAFKA-1902; fix MetricName so that Yammer reporter can work correctly; patched by Jun Rao; reviewed by Manikumar Reddy, Manikumar Reddy and Joel Koshy

Repository: kafka
Updated Branches:
  refs/heads/0.8.2 0b312a6b9 -> 2a1e3d451


KAFKA-1902; fix MetricName so that Yammer reporter can work correctly; patched by Jun Rao; reviewed by Manikumar Reddy, Manikumar Reddy and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a1e3d45
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a1e3d45
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a1e3d45

Branch: refs/heads/0.8.2
Commit: 2a1e3d4510e8fadb0cad0cb7290baf54aae39c23
Parents: 0b312a6
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Jan 28 18:44:16 2015 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 28 18:44:16 2015 -0600

----------------------------------------------------------------------
 .../scala/kafka/metrics/KafkaMetricsGroup.scala | 27 ++++++++++++++++++--
 1 file changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1e3d45/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index e9e4918..9e31184 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -61,9 +61,15 @@ trait KafkaMetricsGroup extends Logging {
       nameBuilder.append(name)
     }
 
-    KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName))
+    val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null)
+    val tagsName = KafkaMetricsGroup.toMBeanName(tags)
+    tagsName match {
+      case Some(tn) =>
+        nameBuilder.append(",").append(tn)
+      case None =>
+    }
 
-    new MetricName(group, typeName, name, null, nameBuilder.toString())
+    new MetricName(group, typeName, name, scope, nameBuilder.toString())
   }
 
   def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
@@ -160,6 +166,23 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
     }
   }
 
+  private def toScope(tags: collection.Map[String, String]): Option[String] = {
+    val filteredTags = tags
+      .filter { case (tagKey, tagValue) => tagValue != ""}
+    if (filteredTags.nonEmpty) {
+      // convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
+      val tagsString = filteredTags
+        .toList.sortWith((t1, t2) => t1._1 < t2._1)
+        .map { case (key, value) => "%s.%s".format(key, value.replaceAll("\\.", "_"))}
+        .mkString(".")
+
+      Some(tagsString)
+    }
+    else {
+      None
+    }
+  }
+
   def removeAllConsumerMetrics(clientId: String) {
     FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
     ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)