You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/07 05:48:51 UTC
svn commit: r1381853 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/log/LogManager.scala
main/scala/kafka/server/KafkaConfig.scala
main/scala/kafka/utils/TopicNameValidator.scala
test/scala/unit/kafka/log/LogManagerTest.scala
Author: junrao
Date: Fri Sep 7 03:48:50 2012
New Revision: 1381853
URL: http://svn.apache.org/viewvc?rev=1381853&view=rev
Log:
Handle topic names with / on Kafka server; patched by Swapnil Ghike; reviewed by Jay Kreps and Jun Rao; kafka-495
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1381853&r1=1381852&r2=1381853&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Fri Sep 7 03:48:50 2012
@@ -49,6 +49,7 @@ private[kafka] class LogManager(val conf
private var zkActor: Actor = null
private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
+ private val topicNameValidator = new TopicNameValidator(config)
private val logFlushIntervalMap = config.flushIntervalMap
private val logRetentionSizeMap = config.logRetentionSizeMap
private val logRetentionMsMap = getMsMap(config.logRetentionHoursMap)
@@ -157,14 +158,13 @@ private[kafka] class LogManager(val conf
new Log(d, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false)
}
}
-
+
/**
* Return the Pool (partitions) for a specific log
*/
private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = {
awaitStartup
- if (topic.length <= 0)
- throw new InvalidTopicException("topic name can't be empty")
+ topicNameValidator.validate(topic)
if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
warn("Wrong partition " + partition + " valid partitions (0," +
(topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1381853&r1=1381852&r2=1381853&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Sep 7 03:48:50 2012
@@ -102,4 +102,7 @@ class KafkaConfig(props: Properties) ext
/* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+
+ /* the maximum length of topic name*/
+ val maxTopicNameLength = Utils.getIntInRange(props, "max.topic.name.length", 255, (1, Int.MaxValue))
}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala?rev=1381853&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/TopicNameValidator.scala Fri Sep 7 03:48:50 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import util.matching.Regex
+import kafka.server.KafkaConfig
+
+class TopicNameValidator(config: KafkaConfig) {
+ private val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' +
+ '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
+ // Regex checks for illegal chars and "." and ".." filenames
+ private val rgx = new Regex("(^\\.{1,2}$)|[" + illegalChars + "]")
+
+ def validate(topic: String) {
+ if (topic.length <= 0)
+ throw new InvalidTopicException("topic name is illegal, can't be empty")
+ else if (topic.length > config.maxTopicNameLength)
+ throw new InvalidTopicException("topic name is illegal, can't be longer than " + config.maxTopicNameLength + " characters")
+
+ rgx.findFirstIn(topic) match {
+ case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal, doesn't match expected regular expression")
+ case None =>
+ }
+ }
+}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1381853&r1=1381852&r2=1381853&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Sep 7 03:48:50 2012
@@ -23,7 +23,8 @@ import kafka.server.KafkaConfig
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils.{Utils, MockTime, TestUtils}
-import kafka.common.OffsetOutOfRangeException
+import kafka.common.{InvalidTopicException, OffsetOutOfRangeException}
+import collection.mutable.ArrayBuffer
class LogManagerTest extends JUnitSuite {
@@ -71,6 +72,30 @@ class LogManagerTest extends JUnitSuite
}
@Test
+ def testInvalidTopicName() {
+ val invalidTopicNames = new ArrayBuffer[String]()
+ invalidTopicNames += ("", ".", "..")
+ var longName = "ATCG"
+ for (i <- 3 to 8)
+ longName += longName
+ invalidTopicNames += longName
+ val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805', '\uFFFA')
+ for (weirdChar <- badChars) {
+ invalidTopicNames += "Is" + weirdChar + "funny"
+ }
+
+ for (i <- 0 until invalidTopicNames.size) {
+ try {
+ logManager.getOrCreateLog(invalidTopicNames(i), 0)
+ fail("Should throw InvalidTopicException.")
+ }
+ catch {
+ case e: InvalidTopicException => "This is good."
+ }
+ }
+ }
+
+ @Test
def testCleanupExpiredSegments() {
val log = logManager.getOrCreateLog("cleanup", 0)
var offset = 0L