You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:18 UTC
[21/37] git commit: KAFKA-1180 WhiteList topic filter gets a
NullPointerException on complex Regex patch by Joe Stein,
reviewed by Joel Koshy
KAFKA-1180 WhiteList topic filter gets a NullPointerException on complex Regex patch by Joe Stein, reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/592678e4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/592678e4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/592678e4
Branch: refs/heads/transactional_messaging
Commit: 592678e4d72151940002ffb1a367ec433b27f2ef
Parents: fc0e03f
Author: Joe Stein <jo...@stealth.ly>
Authored: Sat Jul 19 21:00:22 2014 -0400
Committer: Joe Stein <jo...@stealth.ly>
Committed: Sat Jul 19 21:00:22 2014 -0400
----------------------------------------------------------------------
.../main/scala/kafka/consumer/TopicCount.scala | 4 +--
core/src/main/scala/kafka/utils/Utils.scala | 24 ++++++++++++++++++
.../unit/kafka/consumer/TopicFilterTest.scala | 26 ++++++++++++++++++++
3 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/592678e4/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c793110..8b0ae57 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -19,7 +19,7 @@ package kafka.consumer
import scala.collection._
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
+import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils}
import kafka.common.KafkaException
private[kafka] trait TopicCount {
@@ -127,7 +127,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
}
- def getTopicCountMap = Map(topicFilter.regex -> numStreams)
+ def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams)
def pattern: String = {
topicFilter match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/592678e4/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 63d3dda..6576adf 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -545,4 +545,28 @@ object Utils extends Logging {
lock.unlock()
}
}
+
+ //JSON strings need to be escaped based on ECMA-404 standard http://json.org
+ def JSONEscapeString (s : String) : String = {
+ s.map {
+ case '"' => "\\\""
+ case '\\' => "\\\\"
+ case '/' => "\\/"
+ case '\b' => "\\b"
+ case '\f' => "\\f"
+ case '\n' => "\\n"
+ case '\r' => "\\r"
+ case '\t' => "\\t"
+ /* We'll unicode escape any control characters. These include:
+ * 0x0 -> 0x1f : ASCII Control (C0 Control Codes)
+ * 0x7f : ASCII DELETE
+ * 0x80 -> 0x9f : C1 Control Codes
+ *
+ * Per RFC4627, section 2.5, we're not technically required to
+ * encode the C1 codes, but we do to be safe.
+ */
+ case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int)
+ case c => c
+ }.mkString
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/592678e4/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index d903a6f..4f124af 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -43,6 +43,10 @@ class TopicFilterTest extends JUnitSuite {
val topicFilter3 = new Whitelist("white_listed-topic.+")
assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
assertFalse(topicFilter3.isTopicAllowed("black1", excludeInternalTopics = true))
+
+ val topicFilter4 = new Whitelist("test-(?!bad\\b)[\\w]+")
+ assertTrue(topicFilter4.isTopicAllowed("test-good", excludeInternalTopics = true))
+ assertFalse(topicFilter4.isTopicAllowed("test-bad", excludeInternalTopics = true))
}
@Test
@@ -56,4 +60,26 @@ class TopicFilterTest extends JUnitSuite {
assertFalse(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
assertTrue(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
}
+
+ @Test
+ def testWildcardTopicCountGetTopicCountMapEscapeJson() {
+ def getTopicCountMapKey(regex: String): String = {
+ val topicCount = new WildcardTopicCount(null, "consumerId", new Whitelist(regex), 1, true)
+ topicCount.getTopicCountMap.head._1
+ }
+ //lets make sure that the JSON strings are escaping as we expect
+ //if they are not then when they get saved to zookeeper and read back out they will be broken on parse
+ assertEquals("-\\\"-", getTopicCountMapKey("-\"-"))
+ assertEquals("-\\\\-", getTopicCountMapKey("-\\-"))
+ assertEquals("-\\/-", getTopicCountMapKey("-/-"))
+ assertEquals("-\\\\b-", getTopicCountMapKey("-\\b-"))
+ assertEquals("-\\\\f-", getTopicCountMapKey("-\\f-"))
+ assertEquals("-\\\\n-", getTopicCountMapKey("-\\n-"))
+ assertEquals("-\\\\r-", getTopicCountMapKey("-\\r-"))
+ assertEquals("-\\\\t-", getTopicCountMapKey("-\\t-"))
+ assertEquals("-\\\\u0000-", getTopicCountMapKey("-\\u0000-"))
+ assertEquals("-\\\\u001f-", getTopicCountMapKey("-\\u001f-"))
+ assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-"))
+ assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-"))
+ }
}
\ No newline at end of file