You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/03/09 00:09:39 UTC

[1/2] KAFKA-554 Dynamic per-topic configuration. This patch adds a mechanism for storing per-topic configurations in zookeeper and dynamically making config changes across the cluster. Reviewed by Neha and Jun.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 8e027b2..b73e5d4 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -19,6 +19,9 @@ package kafka.admin
 import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.utils._
+import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{Logging, ZkUtils, TestUtils}
@@ -32,28 +35,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val brokerList = List(0, 1, 2, 3, 4)
 
     // test 0 replication factor
-    try {
+    intercept[AdminOperationException] {
       AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
-      fail("shouldn't allow replication factor 0")
-    }
-    catch {
-      case e: AdministrationException => // this is good
-      case e2 => throw e2
     }
 
     // test wrong replication factor
-    try {
+    intercept[AdminOperationException] {
       AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
-      fail("shouldn't allow replication factor larger than # of brokers")
-    }
-    catch {
-      case e: AdministrationException => // this is good
-      case e2 => throw e2
     }
 
     // correct assignment
-    {
-      val expectedAssignment = Map(
+    val expectedAssignment = Map(
         0 -> List(0, 1, 2),
         1 -> List(1, 2, 3),
         2 -> List(2, 3, 4),
@@ -63,65 +55,34 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
         6 -> List(1, 3, 4),
         7 -> List(2, 4, 0),
         8 -> List(3, 0, 1),
-        9 -> List(4, 1, 2)
-      )
+        9 -> List(4, 1, 2))
 
-      val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
-      val e = (expectedAssignment.toList == actualAssignment.toList)
-      assertTrue(expectedAssignment.toList == actualAssignment.toList)
-    }
+    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
+    val e = (expectedAssignment.toList == actualAssignment.toList)
+    assertTrue(expectedAssignment.toList == actualAssignment.toList)
   }
 
   @Test
   def testManualReplicaAssignment() {
-    val brokerList = Set(0, 1, 2, 3, 4)
-
-    // duplicated brokers
-    try {
-      val replicationAssignmentStr = "0,0,1:1,2,3"
-      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
-      fail("replication assginment shouldn't have duplicated brokers")
-    }
-    catch {
-      case e: AdministrationException => // this is good
-      case e2 => throw e2
-    }
+    val brokers = List(0, 1, 2, 3, 4)
+    TestUtils.createBrokersInZk(zkClient, brokers)
 
-    // non-exist brokers
-    try {
-      val replicationAssignmentStr = "0,1,2:1,2,7"
-      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
-      fail("replication assginment shouldn't contain non-exist brokers")
-    }
-    catch {
-      case e: AdministrationException => // this is good
-      case e2 => throw e2
+    // duplicate brokers
+    intercept[IllegalArgumentException] {
+      AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,0)))
     }
 
     // inconsistent replication factor
-    try {
-      val replicationAssignmentStr = "0,1,2:1,2"
-      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
-      fail("all partitions should have the same replication factor")
-    }
-    catch {
-      case e: AdministrationException => // this is good
-      case e2 => throw e2
+    intercept[IllegalArgumentException] {
+      AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0)))
     }
 
     // good assignment
-    {
-      val replicationAssignmentStr = "0:1:2,1:2:3"
-      val expectedReplicationAssignment = Map(
-        0 -> List(0, 1, 2),
-        1 -> List(1, 2, 3)
-      )
-      val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
-      assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
-      for( (part, replicas) <- expectedReplicationAssignment ) {
-        assertEquals(replicas, actualReplicationAssignment(part))
-      }
-    }
+    val assignment = Map(0 -> List(0, 1, 2),
+                         1 -> List(1, 2, 3))
+    AdminUtils.createTopicWithAssignment(zkClient, "test", assignment)               
+    val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test"))
+    assertEquals(assignment, found("test"))
   }
 
   @Test
@@ -157,7 +118,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val topic = "test"
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
@@ -166,12 +127,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     for(i <- 0 until actualReplicaList.size)
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
 
-    try {
-      AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-      fail("shouldn't be able to create a topic already exists")
-    } catch {
-      case e: TopicExistsException => // this is good
-      case e2 => throw e2
+    intercept[TopicExistsException] {
+      // shouldn't be able to create a topic that already exists
+      AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     }
   }
 
@@ -179,15 +137,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   def testGetTopicMetadata() {
     val expectedReplicaAssignment = Map(
       0 -> List(0, 1, 2),
-      1 -> List(1, 2, 3)
-    )
+      1 -> List(1, 2, 3))
     val leaderForPartitionMap = Map(
       0 -> 0,
-      1 -> 1
-    )
+      1 -> 1)
     val topic = "auto-topic"
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
 
@@ -215,7 +171,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(0, 2, 3)
     val partitionToBeReassigned = 0
@@ -240,7 +196,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(1, 2, 3)
     val partitionToBeReassigned = 0
@@ -266,7 +222,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
@@ -307,7 +263,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     // put the partition in the reassigned path as well
     // reassign partition 0
     val newReplicas = Seq(0, 1)
@@ -346,7 +302,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // broker 2 should be the leader since it was started first
     val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
@@ -367,7 +323,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
 
     // broker 2 should be the leader since it was started first
@@ -404,6 +360,50 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       servers.foreach(_.shutdown())
     }
   }
+  
+  /**
+   * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
+   * then changes the config and checks that the new values take effect.
+   */
+  @Test
+  def testTopicConfigChange() {
+    val partitions = 3
+    val topic = "my-topic"
+    val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+    
+    def makeConfig(messageSize: Int, retentionMs: Long) = {
+      var props = new Properties()
+      props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
+      props.setProperty(LogConfig.RententionMsProp, retentionMs.toString)
+      props
+    }
+    
+    def checkConfig(messageSize: Int, retentionMs: Long) {
+      TestUtils.retry(10000) {
+        for(part <- 0 until partitions) {
+          val logOpt = server.logManager.getLog(TopicAndPartition(topic, part))
+          assertTrue(logOpt.isDefined)
+          assertEquals(retentionMs, logOpt.get.config.retentionMs)
+          assertEquals(messageSize, logOpt.get.config.maxMessageSize)
+        }
+      }
+    }
+    
+    try {
+      // create a topic with a few config overrides and check that they are applied
+      val maxMessageSize = 1024
+      val retentionMs = 1000*1000
+      AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
+      checkConfig(maxMessageSize, retentionMs)
+      
+      // now double the config values for the topic and check that it is applied
+      AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
+      checkConfig(2*maxMessageSize, 2 * retentionMs)
+    } finally {
+      server.shutdown()
+      server.config.logDirs.map(Utils.rm(_))
+    }
+  }
 
   private def checkIfReassignPartitionPathExists(): Boolean = {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 8ae30ea..fec17aa 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -20,6 +20,7 @@ package kafka.consumer
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
+import java.util.Properties
 import scala.collection._
 import junit.framework.Assert._
 
@@ -27,7 +28,7 @@ import kafka.message._
 import kafka.server._
 import kafka.utils.TestUtils._
 import kafka.utils._
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import org.junit.Test
 import kafka.serializer._
 import kafka.cluster.{Broker, Cluster}
@@ -60,7 +61,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
 
   override def setUp() {
     super.setUp
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index f7ee914..c70a435 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -26,7 +26,7 @@ import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer._
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
@@ -298,7 +298,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
 
     // create topic topic1 with 1 partition on broker 0
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 4c646f0..c046a39 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -32,8 +32,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
   val topic = "test_topic"
   val group = "default_group"
   val testConsumer = "consumer"
-  val BrokerPort = 9892
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort)))
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
   val NumMessages = 10
   val LargeOffset = 10000
   val SmallOffset = -1

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 5a57bd1..845b966 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -30,7 +30,7 @@ import kafka.serializer._
 import kafka.producer.{KeyedMessage, Producer}
 import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -55,7 +55,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
   override def setUp() {
     super.setUp
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)))
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
     fetcher.stopAllConnections()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 2fc08d3..1c6a01b 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.AdminUtils
 import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
 import kafka.utils.{TestUtils, Utils}
 
@@ -42,19 +42,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
-  override def setUp() {
-    super.setUp
-    // temporarily set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.FATAL)
-  }
-
-  override def tearDown() {
-    // restore set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.ERROR)
-
-    super.tearDown
-  }
-
   def testFetchRequestCanProperlySerialize() {
     val request = new FetchRequestBuilder()
       .clientId("test-client")
@@ -299,7 +286,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
 
   def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
-    CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
+    AdminUtils.createTopic(zkClient, newTopic, 1, 1)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
@@ -327,10 +314,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     }
 
     // wait until the messages are published
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test1", 0).get.logEndOffset == 2 }, 1000)
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test2", 0).get.logEndOffset == 2 }, 1000)
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test3", 0).get.logEndOffset == 2 }, 1000)
-    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test4", 0).get.logEndOffset == 2 }, 1000)
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, 1000)
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, 1000)
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, 1000)
+    TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, 1000)
 
     val replicaId = servers.head.config.brokerId
     val hwWaitMs = config.replicaHighWatermarkCheckpointIntervalMs
@@ -354,7 +341,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
    */
   def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
     for( topic <- topics ) {
-      CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
+      AdminUtils.createTopic(zkClient, topic, 1, 1)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 5169aea..be94254 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -19,7 +19,7 @@ package kafka.integration
 
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import java.nio.ByteBuffer
 import junit.framework.Assert._
 import org.easymock.EasyMock
@@ -48,7 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testTopicMetadataRequest {
     // create topic
     val topic = "test"
-    CreateTopicCommand.createTopic(zkClient, topic, 1)
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
 
     // create a topic metadata request
     val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
@@ -64,7 +64,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testBasicTopicMetadata {
     // create topic
     val topic = "test"
-    CreateTopicCommand.createTopic(zkClient, topic, 1)
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
     // set up leader for topic partition 0
     val leaderForPartitionMap = Map(
       0 -> configs.head.brokerId
@@ -83,7 +83,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testGetAllTopicMetadata {
     // create topic
     val topic = "test"
-    CreateTopicCommand.createTopic(zkClient, topic, 1)
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
     // set up leader for topic partition 0
     val leaderForPartitionMap = Map(
       0 -> configs.head.brokerId

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index fad3baa..6916df4 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -58,7 +58,7 @@ class LogManagerTest extends JUnit3Suite {
    */
   @Test
   def testCreateLog() {
-    val log = logManager.getOrCreateLog(name, 0)
+    val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
     log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -69,7 +69,7 @@ class LogManagerTest extends JUnit3Suite {
    */
   @Test
   def testGetNonExistentLog() {
-    val log = logManager.getLog(name, 0)
+    val log = logManager.getLog(TopicAndPartition(name, 0))
     assertEquals("No log should be found.", None, log)
     val logFile = new File(logDir, name + "-0")
     assertTrue(!logFile.exists)
@@ -80,7 +80,7 @@ class LogManagerTest extends JUnit3Suite {
    */
   @Test
   def testCleanupExpiredSegments() {
-    val log = logManager.getOrCreateLog(name, 0)
+    val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
     var offset = 0L
     for(i <- 0 until 200) {
       var set = TestUtils.singleMessageSet("test".getBytes())
@@ -120,7 +120,7 @@ class LogManagerTest extends JUnit3Suite {
     logManager.startup
 
     // create a log
-    val log = logManager.getOrCreateLog(name, 0)
+    val log = logManager.createLog(TopicAndPartition(name, 0), config)
     var offset = 0L
 
     // add a bunch of messages that should be larger than the retentionSize
@@ -158,7 +158,7 @@ class LogManagerTest extends JUnit3Suite {
     val config = logConfig.copy(flushMs = 1000)
     logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
     logManager.startup
-    val log = logManager.getOrCreateLog(name, 0)
+    val log = logManager.createLog(TopicAndPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
     for(i <- 0 until 200) {
       var set = TestUtils.singleMessageSet("test".getBytes())
@@ -182,7 +182,7 @@ class LogManagerTest extends JUnit3Suite {
     
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
-      logManager.getOrCreateLog("test", partition)
+      logManager.createLog(TopicAndPartition("test", partition), logConfig)
       assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size)
       val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
       assertTrue("Load should balance evenly", counts.max <= counts.min + 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 04acef5..e4b057e 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -27,7 +27,7 @@ import org.junit.Assert._
 import org.junit.Test
 import kafka.utils._
 import java.util
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.AdminUtils
 import util.Properties
 import kafka.api.FetchRequestBuilder
 import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException}
@@ -77,17 +77,15 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     // restore set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.ERROR)
     server1.shutdown
-    server1.awaitShutdown()
     server2.shutdown
-    server2.awaitShutdown()
     Utils.rm(server1.config.logDirs)
     Utils.rm(server2.config.logDirs)
     super.tearDown()
   }
 
-
+  @Test
   def testUpdateBrokerPartitionInfo() {
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    AdminUtils.createTopic(zkClient, "new-topic", 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@@ -152,7 +150,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     val producerConfig2 = new ProducerConfig(props2)
 
     // create topic with 1 partition and await leadership
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    AdminUtils.createTopic(zkClient, "new-topic", 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@@ -203,7 +201,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 
     // create topic
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+    AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)))
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@@ -213,13 +211,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
-    try {
-      // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
-      // on broker 0
-      producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
-    } catch {
-      case e => fail("Unexpected exception: " + e)
-    }
+    // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only on broker 0
+    producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
 
     // kill the broker
     server1.shutdown
@@ -264,7 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     val producer = new Producer[String, String](config)
 
     // create topics in ZK
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+    AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0,1)))
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b5ee31d..bbf0406 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -19,7 +19,7 @@ package kafka.producer
 
 import java.net.SocketTimeoutException
 import junit.framework.Assert
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import kafka.integration.KafkaServerTestHarness
 import kafka.message._
 import kafka.server.KafkaConfig
@@ -92,7 +92,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
+    AdminUtils.createTopic(zkClient, "test", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
 
     val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
@@ -135,9 +135,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     }
 
     // #2 - test that we get correct offsets when partition is owned by broker
-    CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
+    AdminUtils.createTopic(zkClient, "topic1", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0, 500)
-    CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
+    AdminUtils.createTopic(zkClient, "topic3", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500)
 
     val response2 = producer.send(request)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 8a3e33b..9963502 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -62,7 +62,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     assertEquals(0L, fooPartition0Hw)
     val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
     // create leader and follower replicas
-    val log0 = logManagers(0).getOrCreateLog(topic, 0)
+    val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
     val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
     partition0.addReplicaIfNotExists(leaderReplicaPartition0)
     val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
@@ -101,7 +101,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     assertEquals(0L, topic1Partition0Hw)
     val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
     // create leader log
-    val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0)
+    val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
     // create a local replica for topic1
     val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
     topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
@@ -117,7 +117,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     // add another partition and set highwatermark
     val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
     // create leader log
-    val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0)
+    val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
     // create a local replica for topic2
     val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
     topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 129bc56..176718e 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
@@ -61,7 +61,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val partitionId = 0
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1)))
 
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -108,7 +108,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val partitionId = 0
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1)))
 
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index f857171..6801f4e 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -26,7 +26,7 @@ import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
 import kafka.common.{ErrorMapping, TopicAndPartition}
@@ -82,10 +82,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(topic, part)
+    val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
 
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 20)
@@ -120,7 +120,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 
     var offsetChanged = false
@@ -145,10 +145,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+    AdminUtils.createTopic(zkClient, topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(topic, part)
+    val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 20)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -174,10 +174,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+    AdminUtils.createTopic(zkClient, topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(topic, part)
+    val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 20)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 7430485..d2650e3 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Assert._
 import java.io.File
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
 import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
@@ -53,7 +53,14 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
   producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
   producerProps.put("request.required.acks", "-1")
-
+  
+  override def tearDown() {
+    super.tearDown()
+    for(server <- servers) {
+      server.shutdown()
+      Utils.rm(server.config.logDirs(0))
+    }
+  }
 
   def testHWCheckpointNoFailuresSingleLogSegment {
     // start both servers
@@ -64,7 +71,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
 
     // wait until leader is elected
     var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -86,7 +93,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(numMessages, leaderHW)
     val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(numMessages, followerHW)
-    servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))})
   }
 
   def testHWCheckpointWithFailuresSingleLogSegment {
@@ -98,7 +104,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
 
     // wait until leader is elected
     var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -148,7 +154,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer.close()
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
-    servers.foreach(server => Utils.rm(server.config.logDirs))
   }
 
   def testHWCheckpointNoFailuresMultipleLogSegments {
@@ -163,7 +168,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
 
     // wait until leader is elected
     var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -182,7 +187,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(hw, leaderHW)
     val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(hw, followerHW)
-    servers.foreach(server => Utils.rm(server.config.logDirs))
   }
 
   def testHWCheckpointWithFailuresMultipleLogSegments {
@@ -197,7 +201,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId)))
 
     // wait until leader is elected
     var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -241,7 +245,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer.close()
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
-    servers.foreach(server => Utils.rm(server.config.logDirs))
   }
 
   private def sendMessages(n: Int = 1) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 6989c95..c0475d0 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -26,7 +26,6 @@ import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
-import kafka.admin.CreateTopicCommand
 import kafka.api.{OffsetCommitRequest, OffsetFetchRequest}
 import kafka.utils.TestUtils._
 import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index d0e3590..dd85c71 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -22,9 +22,10 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import kafka.producer.KeyedMessage
 import kafka.serializer.StringEncoder
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import kafka.utils.TestUtils
 import junit.framework.Assert._
+import kafka.common._
 
 class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
   val props = createBrokerConfigs(2)
@@ -50,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
 
     // create a topic and partition and await leadership
     for (topic <- List(topic1,topic2)) {
-      CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+      AdminUtils.createTopic(zkClient, topic, 1, 2)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     }
 
@@ -65,9 +66,10 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
     def logsMatch(): Boolean = {
       var result = true
       for (topic <- List(topic1, topic2)) {
-        val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset
+        val topicAndPart = TopicAndPartition(topic, partition)
+        val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
         result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total &&
-          (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) }
+          (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) }
       }
       result
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 3728f8c..c5f39cb 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -26,7 +26,7 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.producer._
 import kafka.utils.IntEncoder
 import kafka.utils.TestUtils._
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import kafka.api.FetchRequestBuilder
 import kafka.utils.{TestUtils, Utils}
 
@@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
 
     // create topic
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
     // send some messages
     producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 571e2df..1c6f615 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -67,7 +67,7 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.replay(log)
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
-    EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
     EasyMock.replay(logManager)
 
     val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
@@ -133,7 +133,7 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.replay(log)
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
-    EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.getLog(TopicAndPartition(topic, 0))).andReturn(Some(log)).anyTimes()
     EasyMock.replay(logManager)
 
     val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/utils/JsonTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
new file mode 100644
index 0000000..4482dab
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
@@ -0,0 +1,27 @@
+package kafka.utils
+
+import junit.framework.Assert._
+import org.junit.{Test, After, Before}
+
+class JsonTest {
+
+  @Test
+  def testJsonEncoding() {
+    assertEquals("null", Json.encode(null))
+    assertEquals("1", Json.encode(1))
+    assertEquals("1", Json.encode(1L))
+    assertEquals("1", Json.encode(1.toByte))
+    assertEquals("1", Json.encode(1.toShort))
+    assertEquals("1.0", Json.encode(1.0))
+    assertEquals("\"str\"", Json.encode("str"))
+    assertEquals("true", Json.encode(true))
+    assertEquals("false", Json.encode(false))
+    assertEquals("[]", Json.encode(Seq()))
+    assertEquals("[1,2,3]", Json.encode(Seq(1,2,3)))
+    assertEquals("[1,\"2\",[3]]", Json.encode(Seq(1,"2",Seq(3))))
+    assertEquals("{}", Json.encode(Map()))
+    assertEquals("{\"a\":1,\"b\":2}", Json.encode(Map("a" -> 1, "b" -> 2)))
+    assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" -> Seq(3,4))))
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index ec27ef9..b364ac2 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -76,7 +76,9 @@ class SchedulerTest {
   @Test
   def testNonPeriodicTask() {
     scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
-    retry(30000, () => assertEquals(counter1.get, 1))
+    retry(30000) {
+      assertEquals(counter1.get, 1)
+    }
     Thread.sleep(5)
     assertEquals("Should only run once", 1, counter1.get)
   }
@@ -84,6 +86,8 @@ class SchedulerTest {
   @Test
   def testPeriodicTask() {
     scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5)
-    retry(30000, () => assertTrue("Should count to 20", counter1.get >= 20))
+    retry(30000){
+      assertTrue("Should count to 20", counter1.get >= 20)
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 98901c2..40bfacb 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,6 +23,7 @@ import java.nio._
 import java.nio.channels._
 import java.util.Random
 import java.util.Properties
+import junit.framework.AssertionFailedError
 import junit.framework.Assert._
 import kafka.server._
 import kafka.producer._
@@ -122,7 +123,7 @@ object TestUtils extends Logging {
   /**
    * Create a test config for the given node id
    */
-  def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+  def createBrokerConfig(nodeId: Int, port: Int = choosePort()): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)
     props.put("host.name", "localhost")
@@ -448,18 +449,20 @@ object TestUtils extends Logging {
    * Execute the given block. If it throws an assert error, retry. Repeat
    * until no error is thrown or the time limit ellapses
    */
-  def retry(maxWaitMs: Long, block: () => Unit) {
+  def retry(maxWaitMs: Long)(block: => Unit) {
     var wait = 1L
     val startTime = System.currentTimeMillis()
     while(true) {
       try {
-        block()
+        block
         return
       } catch {
-        case e: AssertionError =>
-          if(System.currentTimeMillis - startTime > maxWaitMs) {
+        case e: AssertionFailedError =>
+          val ellapsed = System.currentTimeMillis - startTime 
+          if(ellapsed > maxWaitMs) {
             throw e
           } else {
+            info("Attempt failed, sleeping for " + wait + ", and then retrying.")
             Thread.sleep(wait)
             wait += math.min(wait, 1000)
           }