You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/12/18 00:54:10 UTC

[2/2] git commit: KAFKA-646 Missing files from previous commit

KAFKA-646 Missing files from previous commit


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/994fe081
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/994fe081
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/994fe081

Branch: refs/heads/0.8
Commit: 994fe081b6e429ba1cfd5c09d464015c41e64cf7
Parents: 792b071
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Mon Dec 17 15:53:12 2012 -0800
Committer: Neha Narkhede <ne...@apache.org>
Committed: Mon Dec 17 15:53:12 2012 -0800

----------------------------------------------------------------------
 .../scala/kafka/common/ClientIdAndBroker.scala     |   26 ++++
 .../main/scala/kafka/common/ClientIdAndTopic.scala |   27 +++++
 core/src/main/scala/kafka/common/Config.scala      |   40 +++++++
 core/src/main/scala/kafka/common/Topic.scala       |   40 +++++++
 .../consumer/FetchRequestAndResponseStats.scala    |   58 ++++++++++
 .../kafka/producer/ProducerRequestStats.scala      |   56 +++++++++
 .../main/scala/kafka/producer/ProducerStats.scala  |   40 +++++++
 .../scala/kafka/producer/ProducerTopicStats.scala  |   57 +++++++++
 .../test/scala/unit/kafka/common/ConfigTest.scala  |   89 +++++++++++++++
 .../test/scala/unit/kafka/common/TopicTest.scala   |   61 ++++++++++
 10 files changed, 494 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
new file mode 100644
index 0000000..93223a9
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
@@ -0,0 +1,26 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (clientId, brokerInfo) pairs are used to create
+ * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
+ */
+case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
+  override def toString = "%s-%s".format(clientId, brokerInfo)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
new file mode 100644
index 0000000..7acf9e7
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
@@ -0,0 +1,27 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (clientId, topic) pairs are used in the creation
+ * of many Stats objects.
+ */
+case class ClientIdAndTopic(clientId: String, topic: String) {
+  override def toString = "%s-%s".format(clientId, topic)
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/common/Config.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Config.scala b/core/src/main/scala/kafka/common/Config.scala
new file mode 100644
index 0000000..53bfcfb
--- /dev/null
+++ b/core/src/main/scala/kafka/common/Config.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import util.matching.Regex
+import kafka.utils.Logging
+
+trait Config extends Logging {
+
+  def validateChars(prop: String, value: String) {
+    val legalChars = "[a-zA-Z0-9_-]"
+    val rgx = new Regex(legalChars + "*")
+
+    rgx.findFirstIn(value) match {
+      case Some(t) =>
+        if (!t.equals(value))
+          throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/common/InvalidClientIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InvalidClientIdException.scala b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
new file mode 100644
index 0000000..c96ed62
--- /dev/null
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import util.matching.Regex
+
+object Topic {
+  private val legalChars = "[a-zA-Z0-9_-]"
+  private val maxNameLength = 255
+  private val rgx = new Regex(legalChars + "+")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > maxNameLength)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) =>
+        if (!t.equals(topic))
+          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
new file mode 100644
index 0000000..2cc0f36
--- /dev/null
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import kafka.utils.Pool
+import java.util.concurrent.TimeUnit
+import kafka.common.ClientIdAndBroker
+
+class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(metricId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(metricId + "-FetchResponseSize")
+}
+
+/**
+ * Tracks metrics of the requests made by a given consumer client to all brokers, and the responses obtained from the brokers.
+ * @param clientId ClientId of the given consumer
+ */
+class FetchRequestAndResponseStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
+  private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
+  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "All.Brokers"))
+
+  def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
+
+  def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
+  }
+}
+
+/**
+ * Stores the fetch request and response stats information of each consumer client in a (clientId -> FetchRequestAndResponseStats) map.
+ */
+object FetchRequestAndResponseStatsRegistry {
+  private val valueFactory = (k: String) => new FetchRequestAndResponseStats(k)
+  private val globalStats = new Pool[String, FetchRequestAndResponseStats](Some(valueFactory))
+
+  def getFetchRequestAndResponseStats(clientId: String) = {
+    globalStats.getAndMaybePut(clientId)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
new file mode 100644
index 0000000..e29ccad
--- /dev/null
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.producer
+
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import java.util.concurrent.TimeUnit
+import kafka.utils.Pool
+import kafka.common.ClientIdAndBroker
+
+class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(metricId + "-ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(metricId + "-ProducerRequestSize")
+}
+
+/**
+ * Tracks metrics of requests made by a given producer client to all brokers.
+ * @param clientId ClientId of the given producer
+ */
+class ProducerRequestStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
+  private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
+  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "All.Brokers"))
+
+  def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
+
+  def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
+  }
+}
+
+/**
+ * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map.
+ */
+object ProducerRequestStatsRegistry {
+  private val valueFactory = (k: String) => new ProducerRequestStats(k)
+  private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory))
+
+  def getProducerRequestStats(clientId: String) = {
+    globalStats.getAndMaybePut(clientId)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
new file mode 100644
index 0000000..62ff544
--- /dev/null
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.producer
+
+import kafka.metrics.KafkaMetricsGroup
+import java.util.concurrent.TimeUnit
+import kafka.utils.Pool
+
+class ProducerStats(clientId: String) extends KafkaMetricsGroup {
+  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
+}
+
+/**
+ * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map.
+ */
+object ProducerStatsRegistry {
+  private val valueFactory = (k: String) => new ProducerStats(k)
+  private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory))
+
+  def getProducerStats(clientId: String) = {
+    statsRegistry.getAndMaybePut(clientId)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
new file mode 100644
index 0000000..fd0b44e
--- /dev/null
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.producer
+
+import kafka.metrics.KafkaMetricsGroup
+import kafka.common.ClientIdAndTopic
+import kafka.utils.{Pool, threadsafe}
+import java.util.concurrent.TimeUnit
+
+
+@threadsafe
+class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(metricId + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+}
+
+/**
+ * Tracks metrics for each topic the given producer client has produced data to.
+ * @param clientId The clientId of the given producer client.
+ */
+class ProducerTopicStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
+
+  def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
+
+  def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+  }
+}
+
+/**
+ * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map.
+ */
+object ProducerTopicStatsRegistry {
+  private val valueFactory = (k: String) => new ProducerTopicStats(k)
+  private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory))
+
+  def getProducerTopicStats(clientId: String) = {
+    globalStats.getAndMaybePut(clientId)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/test/scala/unit/kafka/common/ConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
new file mode 100644
index 0000000..6226dda
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.common
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import org.junit.Test
+import kafka.common.InvalidConfigException
+import kafka.producer.ProducerConfig
+import kafka.consumer.ConsumerConfig
+
+class ConfigTest {
+
+  @Test
+  def testInvalidClientIds() {
+    val invalidClientIds = new ArrayBuffer[String]()
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
+    for (weirdChar <- badChars) {
+      invalidClientIds += "Is" + weirdChar + "illegal"
+    }
+
+    for (i <- 0 until invalidClientIds.size) {
+      try {
+        ProducerConfig.validateClientId(invalidClientIds(i))
+        fail("Should throw InvalidClientIdException.")
+      }
+      catch {
+        case e: InvalidConfigException => "This is good."
+      }
+    }
+
+    val validClientIds = new ArrayBuffer[String]()
+    validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "")
+    for (i <- 0 until validClientIds.size) {
+      try {
+        ProducerConfig.validateClientId(validClientIds(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+
+  @Test
+  def testInvalidGroupIds() {
+    val invalidGroupIds = new ArrayBuffer[String]()
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
+    for (weirdChar <- badChars) {
+      invalidGroupIds += "Is" + weirdChar + "illegal"
+    }
+
+    for (i <- 0 until invalidGroupIds.size) {
+      try {
+        ConsumerConfig.validateGroupId(invalidGroupIds(i))
+        fail("Should throw InvalidGroupIdException.")
+      }
+      catch {
+        case e: InvalidConfigException => "This is good."
+      }
+    }
+
+    val validGroupIds = new ArrayBuffer[String]()
+    validGroupIds += ("valid", "GROUP", "iDs", "ar6", "VaL1d", "_0-9_", "")
+    for (i <- 0 until validGroupIds.size) {
+      try {
+        ConsumerConfig.validateGroupId(validGroupIds(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/test/scala/unit/kafka/common/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala
new file mode 100644
index 0000000..b37553e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.common
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import kafka.common.{Topic, InvalidTopicException}
+import org.junit.Test
+
+class TopicTest {
+
+  @Test
+  def testInvalidTopicNames() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 1 to 6)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "illegal"
+    }
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        Topic.validate(invalidTopicNames(i))
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+
+    val validTopicNames = new ArrayBuffer[String]()
+    validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
+    for (i <- 0 until validTopicNames.size) {
+      try {
+        Topic.validate(validTopicNames(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/994fe081/core/src/test/scala/unit/kafka/utils/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TopicTest.scala b/core/src/test/scala/unit/kafka/utils/TopicTest.scala
deleted file mode 100644
index e69de29..0000000