You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/09 14:39:11 UTC

[2/2] kafka git commit: KAFKA-6156; Metric tag values with colons must be sanitized

KAFKA-6156; Metric tag values with colons must be sanitized

Windows directory paths often contain colons which are not allowed in
yammer metrics. Metric tag values with special characters must be
quoted.

Author: huxihx <hu...@hotmail.com>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4173 from huxihx/KAFKA-6156


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff297241
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff297241
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff297241

Branch: refs/heads/1.0
Commit: ff29724126e6b8763d1ca2fbad8aa2940fa57052
Parents: 5308927
Author: huxihx <hu...@hotmail.com>
Authored: Mon Nov 6 15:56:07 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 9 14:38:55 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/kafka/common/utils/Sanitizer.java     |  2 +-
 core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala |  5 +++--
 core/src/main/scala/kafka/utils/ZkUtils.scala             |  2 +-
 core/src/test/scala/unit/kafka/metrics/MetricsTest.scala  | 10 ++++++++++
 4 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ff297241/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
index d35ea91..f921590 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
@@ -40,7 +40,7 @@ public class Sanitizer {
 
     /**
      * Even though only a small number of characters are disallowed in JMX, quote any
-     * string containing special characteres to be safe. All characters in strings sanitized
+     * string containing special characters to be safe. All characters in strings sanitized
      * using {@link #sanitize(String)} are safe for JMX and hence included here.
      */
     private static final Pattern MBEAN_PATTERN = Pattern.compile("[\\w-%\\. \t]*");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff297241/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 7f88d2f..75eaafd 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -24,6 +24,7 @@ import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
 import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
 import kafka.utils.Logging
+import org.apache.kafka.common.utils.Sanitizer
 
 import scala.collection.immutable
 import scala.collection.JavaConverters._
@@ -38,7 +39,7 @@ trait KafkaMetricsGroup extends Logging {
    * @param tags Additional attributes which mBean will have.
    * @return Sanitized metric name object.
    */
-  protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
+  def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
@@ -154,7 +155,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
   private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
     val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
     if (filteredTags.nonEmpty) {
-      val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, value) }.mkString(",")
+      val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, Sanitizer.jmxSanitize(value)) }.mkString(",")
       Some(tagsString)
     }
     else None

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff297241/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ca577bc..99609ed 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -233,7 +233,7 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
     extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
   val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
 
-  override protected def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
+  override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
     explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff297241/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index bff2136..f6e74bb 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -18,6 +18,7 @@
 package kafka.metrics
 
 import java.util.Properties
+import javax.management.ObjectName
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Meter, MetricPredicate}
@@ -99,6 +100,15 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
   }
 
+  @Test
+  def testWindowsStyleTagNames(): Unit = {
+    val path = "C:\\windows-path\\kafka-logs"
+    val tags = Map("dir" -> path)
+    val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=")
+    val metric = KafkaMetricsGroup.metricName("test-metric", tags)
+    assert(metric.getMBeanName.endsWith(expectedMBeanName))
+  }
+
   @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
   def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = {
     sendMessages(servers, topic, nMessages)