You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/06 15:59:43 UTC

kafka git commit: KAFKA-6156; Metric tag values with colons must be sanitized

Repository: kafka
Updated Branches:
  refs/heads/trunk 86062e9a7 -> 7672e9ec3


KAFKA-6156; Metric tag values with colons must be sanitized

Windows directory paths often contain colons which are not allowed in
yammer metrics. Metric tag values with special characters must be
quoted.

Author: huxihx <hu...@hotmail.com>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4173 from huxihx/KAFKA-6156


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7672e9ec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7672e9ec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7672e9ec

Branch: refs/heads/trunk
Commit: 7672e9ec3def7af6797bc0ecf254ac694efdfad5
Parents: 86062e9
Author: huxihx <hu...@hotmail.com>
Authored: Mon Nov 6 15:56:07 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Mon Nov 6 15:59:23 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/kafka/common/utils/Sanitizer.java     |  2 +-
 core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala |  3 ++-
 core/src/test/scala/unit/kafka/metrics/MetricsTest.scala  | 10 ++++++++++
 3 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7672e9ec/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
index d35ea91..f921590 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
@@ -40,7 +40,7 @@ public class Sanitizer {
 
     /**
      * Even though only a small number of characters are disallowed in JMX, quote any
-     * string containing special characteres to be safe. All characters in strings sanitized
+     * string containing special characters to be safe. All characters in strings sanitized
      * using {@link #sanitize(String)} are safe for JMX and hence included here.
      */
     private static final Pattern MBEAN_PATTERN = Pattern.compile("[\\w-%\\. \t]*");

http://git-wip-us.apache.org/repos/asf/kafka/blob/7672e9ec/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index a3a53e1..75eaafd 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -24,6 +24,7 @@ import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
 import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
 import kafka.utils.Logging
+import org.apache.kafka.common.utils.Sanitizer
 
 import scala.collection.immutable
 import scala.collection.JavaConverters._
@@ -154,7 +155,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
   private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
     val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
     if (filteredTags.nonEmpty) {
-      val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, value) }.mkString(",")
+      val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, Sanitizer.jmxSanitize(value)) }.mkString(",")
       Some(tagsString)
     }
     else None

http://git-wip-us.apache.org/repos/asf/kafka/blob/7672e9ec/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 291dfce..0dcff53 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -18,6 +18,7 @@
 package kafka.metrics
 
 import java.util.Properties
+import javax.management.ObjectName
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Meter, MetricPredicate}
@@ -99,6 +100,15 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
   }
 
+  @Test
+  def testWindowsStyleTagNames(): Unit = {
+    val path = "C:\\windows-path\\kafka-logs"
+    val tags = Map("dir" -> path)
+    val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=")
+    val metric = KafkaMetricsGroup.metricName("test-metric", tags)
+    assert(metric.getMBeanName.endsWith(expectedMBeanName))
+  }
+
   @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
   def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = {
     sendMessages(servers, topic, nMessages)