You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/27 03:33:54 UTC

svn commit: r1390800 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/server/ main/scala/kafka/tools/ main/scala/kafka/utils/ test/scala/unit/kafka/utils/

Author: junrao
Date: Thu Sep 27 01:33:54 2012
New Revision: 1390800

URL: http://svn.apache.org/viewvc?rev=1390800&view=rev
Log:
restrict topic names (reopened); patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-495

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Thu Sep 27 01:33:54 2012
@@ -51,11 +51,6 @@ object CreateTopicCommand extends Loggin
                                         "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
                            .ofType(classOf[String])
                            .defaultsTo("")
-    val maxTopicNameLenOpt = parser.accepts("max-name-len", "maximum length of the topic name")
-                           .withRequiredArg
-                           .describedAs("max topic name length")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(Topic.maxNameLength)
 
     val options = parser.parse(args : _*)
 
@@ -68,7 +63,6 @@ object CreateTopicCommand extends Loggin
     }
 
     val topic = options.valueOf(topicOpt)
-    val maxTopicNameLength = options.valueOf(maxTopicNameLenOpt).intValue
     val zkConnect = options.valueOf(zkConnectOpt)
     val nPartitions = options.valueOf(nPartitionsOpt).intValue
     val replicationFactor = options.valueOf(replicationFactorOpt).intValue
@@ -76,8 +70,7 @@ object CreateTopicCommand extends Loggin
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      val topicNameValidator = new TopicNameValidator(maxTopicNameLength)
-      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr, topicNameValidator)
+      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
       println("creation succeeded!")
     } catch {
       case e =>
@@ -89,9 +82,8 @@ object CreateTopicCommand extends Loggin
     }
   }
 
-  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "",
-                  topicNameValidator: TopicNameValidator = new TopicNameValidator(Topic.maxNameLength)) {
-    topicNameValidator.validate(topic)
+  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+    Topic.validate(topic)
 
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Sep 27 01:33:54 2012
@@ -22,7 +22,7 @@ import kafka.admin.{CreateTopicCommand, 
 import kafka.api._
 import kafka.message._
 import kafka.network._
-import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging}
+import kafka.utils.{Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
@@ -44,7 +44,6 @@ class KafkaApis(val requestChannel: Requ
   private val producerRequestPurgatory = new ProducerRequestPurgatory
   private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val delayedRequestMetrics = new DelayedRequestMetrics
-  private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength)
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
@@ -389,8 +388,7 @@ class KafkaApis(val requestChannel: Requ
             try {
               /* check if auto creation of topics is turned on */
               if (config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor,
-                                               topicNameValidator = topicNameValidator)
+                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
                 info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                              .format(topic, config.numPartitions, config.defaultReplicationFactor))
                 val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Thu Sep 27 01:33:54 2012
@@ -112,9 +112,6 @@ class KafkaConfig private (val props: Ve
   /* enable auto creation of topic on the server */
   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
 
-  /* the maximum length of topic name*/
-  val maxTopicNameLength = props.getIntInRange("max.topic.name.length", Topic.maxNameLength, (1, Int.MaxValue))
-
   /**
    * Following properties are relevant to Kafka replication
    */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java Thu Sep 27 01:33:54 2012
@@ -17,6 +17,15 @@
 
 package kafka.tools;
 
+import joptsimple.*;
+import kafka.javaapi.producer.Producer;
+import kafka.javaapi.producer.ProducerData;
+import kafka.message.Message;
+import kafka.producer.ProducerConfig;
+import kafka.utils.Utils;
+import scala.collection.Iterator;
+import scala.collection.JavaConversions;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -28,18 +37,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import joptsimple.ArgumentAcceptingOptionSpec;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-import joptsimple.OptionSpecBuilder;
-import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
-import kafka.message.Message;
-import kafka.producer.ProducerConfig;
-import kafka.utils.Utils;
-import scala.collection.JavaConversions;
-import scala.collection.Iterator;
 
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala Thu Sep 27 01:33:54 2012
@@ -21,24 +21,21 @@ import kafka.common.InvalidTopicExceptio
 import util.matching.Regex
 
 object Topic {
+  val legalChars = "[a-zA-Z0-9_-]"
   val maxNameLength = 255
-  val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' +
-                     '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
-}
-
-class TopicNameValidator(maxLen: Int) {
-  // Regex checks for illegal chars and "." and ".." filenames
-  private val rgx = new Regex("(^\\.{1,2}$)|[" + Topic.illegalChars + "]")
+  private val rgx = new Regex(legalChars + "+")
 
   def validate(topic: String) {
     if (topic.length <= 0)
       throw new InvalidTopicException("topic name is illegal, can't be empty")
-    else if (topic.length > maxLen)
-      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxLen + " characters")
+    else if (topic.length > maxNameLength)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
 
     rgx.findFirstIn(topic) match {
-      case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal, doesn't match expected regular expression")
-      case None =>
+      case Some(t) =>
+        if (!t.equals(topic))
+          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala?rev=1390800&r1=1390799&r2=1390800&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala Thu Sep 27 01:33:54 2012
@@ -32,21 +32,30 @@ class TopicTest {
     for (i <- 1 to 6)
       longName += longName
     invalidTopicNames += longName
-    val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805', '\uFFFA')
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
     for (weirdChar <- badChars) {
       invalidTopicNames += "Is" + weirdChar + "funny"
     }
 
-    val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
-
     for (i <- 0 until invalidTopicNames.size) {
       try {
-        topicNameValidator.validate(invalidTopicNames(i))
+        Topic.validate(invalidTopicNames(i))
         fail("Should throw InvalidTopicException.")
       }
       catch {
         case e: InvalidTopicException => "This is good."
       }
     }
+
+    val validTopicNames = new ArrayBuffer[String]()
+    validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
+    for (i <- 0 until validTopicNames.size) {
+      try {
+        Topic.validate(validTopicNames(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
   }
 }