You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/07 05:48:51 UTC

svn commit: r1381853 - in /incubator/kafka/trunk/core/src: main/scala/kafka/log/LogManager.scala main/scala/kafka/server/KafkaConfig.scala main/scala/kafka/utils/TopicNameValidator.scala test/scala/unit/kafka/log/LogManagerTest.scala

Author: junrao
Date: Fri Sep  7 03:48:50 2012
New Revision: 1381853

URL: http://svn.apache.org/viewvc?rev=1381853&view=rev
Log:
Handle topic names with / on Kafka server; patched by Swapnil Ghike; reviewed by Jay Kreps and Jun Rao; kafka-495

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1381853&r1=1381852&r2=1381853&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Fri Sep  7 03:48:50 2012
@@ -49,6 +49,7 @@ private[kafka] class LogManager(val conf
   private var zkActor: Actor = null
   private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
+  private val topicNameValidator = new TopicNameValidator(config)
   private val logFlushIntervalMap = config.flushIntervalMap
   private val logRetentionSizeMap = config.logRetentionSizeMap
   private val logRetentionMsMap = getMsMap(config.logRetentionHoursMap)
@@ -157,14 +158,13 @@ private[kafka] class LogManager(val conf
       new Log(d, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false)
     }
   }
-  
+
   /**
    * Return the Pool (partitions) for a specific log
    */
   private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = {
     awaitStartup
-    if (topic.length <= 0)
-      throw new InvalidTopicException("topic name can't be empty")
+    topicNameValidator.validate(topic)
     if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
       warn("Wrong partition " + partition + " valid partitions (0," +
               (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1381853&r1=1381852&r2=1381853&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Sep  7 03:48:50 2012
@@ -102,4 +102,7 @@ class KafkaConfig(props: Properties) ext
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+
+  /* the maximum length of topic name*/
+  val maxTopicNameLength = Utils.getIntInRange(props, "max.topic.name.length", 255, (1, Int.MaxValue))
 }

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala?rev=1381853&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala Fri Sep  7 03:48:50 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import util.matching.Regex
+import kafka.server.KafkaConfig
+
+class TopicNameValidator(config: KafkaConfig) {
+  private val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' +
+                          '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
+  // Regex checks for illegal chars and "." and ".." filenames
+  private val rgx = new Regex("(^\\.{1,2}$)|[" + illegalChars + "]")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > config.maxTopicNameLength)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + config.maxTopicNameLength + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal, doesn't match expected regular expression")
+      case None =>
+    }
+  }
+}

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1381853&r1=1381852&r2=1381853&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Sep  7 03:48:50 2012
@@ -23,7 +23,8 @@ import kafka.server.KafkaConfig
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, MockTime, TestUtils}
-import kafka.common.OffsetOutOfRangeException
+import kafka.common.{InvalidTopicException, OffsetOutOfRangeException}
+import collection.mutable.ArrayBuffer
 
 class LogManagerTest extends JUnitSuite {
 
@@ -71,6 +72,30 @@ class LogManagerTest extends JUnitSuite 
   }
 
   @Test
+  def testInvalidTopicName() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 3 to 8)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805', '\uFFFA')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "funny"
+    }
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        logManager.getOrCreateLog(invalidTopicNames(i), 0)
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+  }
+
+  @Test
   def testCleanupExpiredSegments() {
     val log = logManager.getOrCreateLog("cleanup", 0)
     var offset = 0L