You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/27 03:33:54 UTC
svn commit: r1390800 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/server/ main/scala/kafka/tools/
main/scala/kafka/utils/ test/scala/unit/kafka/utils/
Author: junrao
Date: Thu Sep 27 01:33:54 2012
New Revision: 1390800
URL: http://svn.apache.org/viewvc?rev=1390800&view=rev
Log:
restrict topic names (reopened); patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-495
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Thu Sep 27 01:33:54 2012
@@ -51,11 +51,6 @@ object CreateTopicCommand extends Loggin
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
.ofType(classOf[String])
.defaultsTo("")
- val maxTopicNameLenOpt = parser.accepts("max-name-len", "maximum length of the topic name")
- .withRequiredArg
- .describedAs("max topic name length")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(Topic.maxNameLength)
val options = parser.parse(args : _*)
@@ -68,7 +63,6 @@ object CreateTopicCommand extends Loggin
}
val topic = options.valueOf(topicOpt)
- val maxTopicNameLength = options.valueOf(maxTopicNameLenOpt).intValue
val zkConnect = options.valueOf(zkConnectOpt)
val nPartitions = options.valueOf(nPartitionsOpt).intValue
val replicationFactor = options.valueOf(replicationFactorOpt).intValue
@@ -76,8 +70,7 @@ object CreateTopicCommand extends Loggin
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
- val topicNameValidator = new TopicNameValidator(maxTopicNameLength)
- createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr, topicNameValidator)
+ createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
println("creation succeeded!")
} catch {
case e =>
@@ -89,9 +82,8 @@ object CreateTopicCommand extends Loggin
}
}
- def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "",
- topicNameValidator: TopicNameValidator = new TopicNameValidator(Topic.maxNameLength)) {
- topicNameValidator.validate(topic)
+ def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+ Topic.validate(topic)
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Sep 27 01:33:54 2012
@@ -22,7 +22,7 @@ import kafka.admin.{CreateTopicCommand,
import kafka.api._
import kafka.message._
import kafka.network._
-import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging}
+import kafka.utils.{Pool, SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection._
import mutable.HashMap
@@ -44,7 +44,6 @@ class KafkaApis(val requestChannel: Requ
private val producerRequestPurgatory = new ProducerRequestPurgatory
private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
private val delayedRequestMetrics = new DelayedRequestMetrics
- private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength)
private val requestLogger = Logger.getLogger("kafka.request.logger")
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
@@ -389,8 +388,7 @@ class KafkaApis(val requestChannel: Requ
try {
/* check if auto creation of topics is turned on */
if (config.autoCreateTopics) {
- CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor,
- topicNameValidator = topicNameValidator)
+ CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.numPartitions, config.defaultReplicationFactor))
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Thu Sep 27 01:33:54 2012
@@ -112,9 +112,6 @@ class KafkaConfig private (val props: Ve
/* enable auto creation of topic on the server */
val autoCreateTopics = props.getBoolean("auto.create.topics", true)
- /* the maximum length of topic name*/
- val maxTopicNameLength = props.getIntInRange("max.topic.name.length", Topic.maxNameLength, (1, Int.MaxValue))
-
/**
* Following properties are relevant to Kafka replication
*/
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java Thu Sep 27 01:33:54 2012
@@ -17,6 +17,15 @@
package kafka.tools;
+import joptsimple.*;
+import kafka.javaapi.producer.Producer;
+import kafka.javaapi.producer.ProducerData;
+import kafka.message.Message;
+import kafka.producer.ProducerConfig;
+import kafka.utils.Utils;
+import scala.collection.Iterator;
+import scala.collection.JavaConversions;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -28,18 +37,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-import joptsimple.ArgumentAcceptingOptionSpec;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-import joptsimple.OptionSpecBuilder;
-import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
-import kafka.message.Message;
-import kafka.producer.ProducerConfig;
-import kafka.utils.Utils;
-import scala.collection.JavaConversions;
-import scala.collection.Iterator;
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala Thu Sep 27 01:33:54 2012
@@ -21,24 +21,21 @@ import kafka.common.InvalidTopicExceptio
import util.matching.Regex
object Topic {
+ val legalChars = "[a-zA-Z0-9_-]"
val maxNameLength = 255
- val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' +
- '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
-}
-
-class TopicNameValidator(maxLen: Int) {
- // Regex checks for illegal chars and "." and ".." filenames
- private val rgx = new Regex("(^\\.{1,2}$)|[" + Topic.illegalChars + "]")
+ private val rgx = new Regex(legalChars + "+")
def validate(topic: String) {
if (topic.length <= 0)
throw new InvalidTopicException("topic name is illegal, can't be empty")
- else if (topic.length > maxLen)
- throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxLen + " characters")
+ else if (topic.length > maxNameLength)
+ throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
rgx.findFirstIn(topic) match {
- case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal, doesn't match expected regular expression")
- case None =>
+ case Some(t) =>
+ if (!t.equals(topic))
+ throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+ case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala Thu Sep 27 01:33:54 2012
@@ -32,21 +32,30 @@ class TopicTest {
for (i <- 1 to 6)
longName += longName
invalidTopicNames += longName
- val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805', '\uFFFA')
+ val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
for (weirdChar <- badChars) {
invalidTopicNames += "Is" + weirdChar + "funny"
}
- val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
-
for (i <- 0 until invalidTopicNames.size) {
try {
- topicNameValidator.validate(invalidTopicNames(i))
+ Topic.validate(invalidTopicNames(i))
fail("Should throw InvalidTopicException.")
}
catch {
case e: InvalidTopicException => "This is good."
}
}
+
+ val validTopicNames = new ArrayBuffer[String]()
+ validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
+ for (i <- 0 until validTopicNames.size) {
+ try {
+ Topic.validate(validTopicNames(i))
+ }
+ catch {
+ case e: Exception => fail("Should not throw exception.")
+ }
+ }
}
}