You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/09 14:39:10 UTC

[1/2] kafka git commit: MINOR: Update Scala 2.11 to 2.11.12 [Forced Update!]

Repository: kafka
Updated Branches:
  refs/heads/1.0 64d064818 -> 13f036dd2 (forced update)


MINOR: Update Scala 2.11 to 2.11.12

The main change is Java 9 support.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4185 from ijuma/scala-2.11.12

(cherry picked from commit 4a55818bbfb8d104b1ef34c5b9d96184e8187c5c)
Signed-off-by: Ismael Juma <is...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13f036dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13f036dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13f036dd

Branch: refs/heads/1.0
Commit: 13f036dd20d8fe0f350aece52c211c2f9901427f
Parents: ff29724
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Nov 9 14:23:54 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 9 14:38:55 2017 +0000

----------------------------------------------------------------------
 gradle/dependencies.gradle | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/13f036dd/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 46ac7ed..cfb0b9b 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -29,7 +29,7 @@ ext {
 }
 
 // Add Scala version
-def defaultScala211Version = '2.11.11'
+def defaultScala211Version = '2.11.12'
 def defaultScala212Version = '2.12.4'
 if (hasProperty('scalaVersion')) {
   if (scalaVersion == '2.11') {


[2/2] kafka git commit: KAFKA-6156; Metric tag values with colons must be sanitized

Posted by ij...@apache.org.
KAFKA-6156; Metric tag values with colons must be sanitized

Windows directory paths often contain colons which are not allowed in
yammer metrics. Metric tag values with special characters must be
quoted.

Author: huxihx <hu...@hotmail.com>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4173 from huxihx/KAFKA-6156


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff297241
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff297241
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff297241

Branch: refs/heads/1.0
Commit: ff29724126e6b8763d1ca2fbad8aa2940fa57052
Parents: 5308927
Author: huxihx <hu...@hotmail.com>
Authored: Mon Nov 6 15:56:07 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 9 14:38:55 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/kafka/common/utils/Sanitizer.java     |  2 +-
 core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala |  5 +++--
 core/src/main/scala/kafka/utils/ZkUtils.scala             |  2 +-
 core/src/test/scala/unit/kafka/metrics/MetricsTest.scala  | 10 ++++++++++
 4 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ff297241/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
index d35ea91..f921590 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
@@ -40,7 +40,7 @@ public class Sanitizer {
 
     /**
      * Even though only a small number of characters are disallowed in JMX, quote any
-     * string containing special characteres to be safe. All characters in strings sanitized
+     * string containing special characters to be safe. All characters in strings sanitized
      * using {@link #sanitize(String)} are safe for JMX and hence included here.
      */
     private static final Pattern MBEAN_PATTERN = Pattern.compile("[\\w-%\\. \t]*");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff297241/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 7f88d2f..75eaafd 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -24,6 +24,7 @@ import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
 import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
 import kafka.utils.Logging
+import org.apache.kafka.common.utils.Sanitizer
 
 import scala.collection.immutable
 import scala.collection.JavaConverters._
@@ -38,7 +39,7 @@ trait KafkaMetricsGroup extends Logging {
    * @param tags Additional attributes which mBean will have.
    * @return Sanitized metric name object.
    */
-  protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
+  def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
@@ -154,7 +155,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
   private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
     val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
     if (filteredTags.nonEmpty) {
-      val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, value) }.mkString(",")
+      val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, Sanitizer.jmxSanitize(value)) }.mkString(",")
       Some(tagsString)
     }
     else None

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff297241/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ca577bc..99609ed 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -233,7 +233,7 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
     extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
   val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
 
-  override protected def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
+  override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
     explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff297241/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index bff2136..f6e74bb 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -18,6 +18,7 @@
 package kafka.metrics
 
 import java.util.Properties
+import javax.management.ObjectName
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Meter, MetricPredicate}
@@ -99,6 +100,15 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1)
   }
 
+  @Test
+  def testWindowsStyleTagNames(): Unit = {
+    val path = "C:\\windows-path\\kafka-logs"
+    val tags = Map("dir" -> path)
+    val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=")
+    val metric = KafkaMetricsGroup.metricName("test-metric", tags)
+    assert(metric.getMBeanName.endsWith(expectedMBeanName))
+  }
+
   @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
   def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = {
     sendMessages(servers, topic, nMessages)