You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:18 UTC

[21/37] git commit: KAFKA-1180 WhiteList topic filter gets a NullPointerException on complex Regex patch by Joe Stein, reviewed by Joel Koshy

KAFKA-1180 WhiteList topic filter gets a NullPointerException on complex Regex patch by Joe Stein, reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/592678e4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/592678e4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/592678e4

Branch: refs/heads/transactional_messaging
Commit: 592678e4d72151940002ffb1a367ec433b27f2ef
Parents: fc0e03f
Author: Joe Stein <jo...@stealth.ly>
Authored: Sat Jul 19 21:00:22 2014 -0400
Committer: Joe Stein <jo...@stealth.ly>
Committed: Sat Jul 19 21:00:22 2014 -0400

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/TopicCount.scala  |  4 +--
 core/src/main/scala/kafka/utils/Utils.scala     | 24 ++++++++++++++++++
 .../unit/kafka/consumer/TopicFilterTest.scala   | 26 ++++++++++++++++++++
 3 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/592678e4/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c793110..8b0ae57 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -19,7 +19,7 @@ package kafka.consumer
 
 import scala.collection._
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
+import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils}
 import kafka.common.KafkaException
 
 private[kafka] trait TopicCount {
@@ -127,7 +127,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
     makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
   }
 
-  def getTopicCountMap = Map(topicFilter.regex -> numStreams)
+  def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams)
 
   def pattern: String = {
     topicFilter match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/592678e4/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 63d3dda..6576adf 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -545,4 +545,28 @@ object Utils extends Logging {
       lock.unlock()
     }
   }
+
+    //JSON strings need to be escaped based on ECMA-404 standard http://json.org
+  def JSONEscapeString (s : String) : String = {
+    s.map {
+      case '"'  => "\\\""
+      case '\\' => "\\\\"
+      case '/'  => "\\/"
+      case '\b' => "\\b"
+      case '\f' => "\\f"
+      case '\n' => "\\n"
+      case '\r' => "\\r"
+      case '\t' => "\\t"
+      /* We'll unicode escape any control characters. These include:
+       * 0x0 -> 0x1f  : ASCII Control (C0 Control Codes)
+       * 0x7f         : ASCII DELETE
+       * 0x80 -> 0x9f : C1 Control Codes
+       *
+       * Per RFC4627, section 2.5, we're not technically required to
+       * encode the C1 codes, but we do to be safe.
+       */
+      case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int)
+      case c => c
+    }.mkString 
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/592678e4/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index d903a6f..4f124af 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -43,6 +43,10 @@ class TopicFilterTest extends JUnitSuite {
     val topicFilter3 = new Whitelist("white_listed-topic.+")
     assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
     assertFalse(topicFilter3.isTopicAllowed("black1", excludeInternalTopics = true))
+
+    val topicFilter4 = new Whitelist("test-(?!bad\\b)[\\w]+")
+    assertTrue(topicFilter4.isTopicAllowed("test-good", excludeInternalTopics = true))
+    assertFalse(topicFilter4.isTopicAllowed("test-bad", excludeInternalTopics = true))    
   }
 
   @Test
@@ -56,4 +60,26 @@ class TopicFilterTest extends JUnitSuite {
     assertFalse(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
     assertTrue(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
   }
+
+  @Test
+  def testWildcardTopicCountGetTopicCountMapEscapeJson() {
+    def getTopicCountMapKey(regex: String): String = {
+      val topicCount = new WildcardTopicCount(null, "consumerId", new Whitelist(regex), 1, true)
+      topicCount.getTopicCountMap.head._1
+    }
+    //lets make sure that the JSON strings are escaping as we expect
+    //if they are not then when they get saved to zookeeper and read back out they will be broken on parse
+    assertEquals("-\\\"-", getTopicCountMapKey("-\"-"))
+    assertEquals("-\\\\-", getTopicCountMapKey("-\\-"))
+    assertEquals("-\\/-", getTopicCountMapKey("-/-"))
+    assertEquals("-\\\\b-", getTopicCountMapKey("-\\b-"))
+    assertEquals("-\\\\f-", getTopicCountMapKey("-\\f-"))
+    assertEquals("-\\\\n-", getTopicCountMapKey("-\\n-"))
+    assertEquals("-\\\\r-", getTopicCountMapKey("-\\r-"))
+    assertEquals("-\\\\t-", getTopicCountMapKey("-\\t-"))
+    assertEquals("-\\\\u0000-", getTopicCountMapKey("-\\u0000-"))
+    assertEquals("-\\\\u001f-", getTopicCountMapKey("-\\u001f-"))
+    assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-"))
+    assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-"))
+  }    
 }
\ No newline at end of file