You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/06 15:59:43 UTC
kafka git commit: KAFKA-6156;
Metric tag values with colons must be sanitized
Repository: kafka
Updated Branches:
refs/heads/trunk 86062e9a7 -> 7672e9ec3
KAFKA-6156; Metric tag values with colons must be sanitized
Windows directory paths often contain colons which are not allowed in
yammer metrics. Metric tag values with special characters must be
quoted.
Author: huxihx <hu...@hotmail.com>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #4173 from huxihx/KAFKA-6156
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7672e9ec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7672e9ec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7672e9ec
Branch: refs/heads/trunk
Commit: 7672e9ec3def7af6797bc0ecf254ac694efdfad5
Parents: 86062e9
Author: huxihx <hu...@hotmail.com>
Authored: Mon Nov 6 15:56:07 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Mon Nov 6 15:59:23 2017 +0000
----------------------------------------------------------------------
.../java/org/apache/kafka/common/utils/Sanitizer.java | 2 +-
core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala | 3 ++-
core/src/test/scala/unit/kafka/metrics/MetricsTest.scala | 10 ++++++++++
3 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7672e9ec/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
index d35ea91..f921590 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
@@ -40,7 +40,7 @@ public class Sanitizer {
/**
* Even though only a small number of characters are disallowed in JMX, quote any
- * string containing special characteres to be safe. All characters in strings sanitized
+ * string containing special characters to be safe. All characters in strings sanitized
* using {@link #sanitize(String)} are safe for JMX and hence included here.
*/
private static final Pattern MBEAN_PATTERN = Pattern.compile("[\\w-%\\. \t]*");
http://git-wip-us.apache.org/repos/asf/kafka/blob/7672e9ec/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index a3a53e1..75eaafd 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -24,6 +24,7 @@ import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
import kafka.utils.Logging
+import org.apache.kafka.common.utils.Sanitizer
import scala.collection.immutable
import scala.collection.JavaConverters._
@@ -154,7 +155,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
if (filteredTags.nonEmpty) {
- val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, value) }.mkString(",")
+ val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, Sanitizer.jmxSanitize(value)) }.mkString(",")
Some(tagsString)
}
else None
http://git-wip-us.apache.org/repos/asf/kafka/blob/7672e9ec/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 291dfce..0dcff53 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -18,6 +18,7 @@
package kafka.metrics
import java.util.Properties
+import javax.management.ObjectName
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Meter, MetricPredicate}
@@ -99,6 +100,15 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
}
+ @Test
+ def testWindowsStyleTagNames(): Unit = {
+ val path = "C:\\windows-path\\kafka-logs"
+ val tags = Map("dir" -> path)
+ val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=")
+ val metric = KafkaMetricsGroup.metricName("test-metric", tags)
+ assert(metric.getMBeanName.endsWith(expectedMBeanName))
+ }
+
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = {
sendMessages(servers, topic, nMessages)