You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:48 UTC

[03/36] git commit: kafka-347; change number of partitions of a topic online; patched by Sriram Subramanian; reviewed by Neha Narkehede, Guozhang Wang, Joel Koshy and Jun Rao

kafka-347; change number of partitions of a topic online; patched by Sriram Subramanian; reviewed by Neha Narkehede, Guozhang Wang, Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3817857b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3817857b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3817857b

Branch: refs/heads/trunk
Commit: 3817857b15f9ce03f10a9730f0ff4619d728b06f
Parents: b1891e7
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Mon Jul 22 21:33:28 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jul 22 21:33:28 2013 -0700

----------------------------------------------------------------------
 bin/kafka-add-partitions.sh                     |  19 ++
 .../kafka/admin/AddPartitionsCommand.scala      | 127 ++++++++++
 .../src/main/scala/kafka/admin/AdminUtils.scala |  27 +-
 .../scala/kafka/admin/CreateTopicCommand.scala  |   2 +-
 .../kafka/controller/KafkaController.scala      |   2 +
 .../controller/PartitionStateMachine.scala      |  28 ++-
 .../unit/kafka/admin/AddPartitionsTest.scala    | 251 +++++++++++++++++++
 .../test/scala/unit/kafka/admin/AdminTest.scala |  16 +-
 8 files changed, 448 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/bin/kafka-add-partitions.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-add-partitions.sh b/bin/kafka-add-partitions.sh
new file mode 100755
index 0000000..7d217e2
--- /dev/null
+++ b/bin/kafka-add-partitions.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.AddPartitionsCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
new file mode 100644
index 0000000..5757c32
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import scala.collection.mutable
+import kafka.common.TopicAndPartition
+
+object AddPartitionsCommand extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need to be added.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+      "Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
+    val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add to the topic")
+      .withRequiredArg
+      .describedAs("# of partitions")
+      .ofType(classOf[java.lang.Integer])
+    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers for the new partitions")
+      .withRequiredArg
+      .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
+      "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
+      .ofType(classOf[String])
+      .defaultsTo("")
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("***Please note that this tool can only be used to add partitions when data for a topic does not use a key.***\n" +
+          "Missing required argument. " + " \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topic = options.valueOf(topicOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val nPartitions = options.valueOf(nPartitionsOpt).intValue
+    val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+      println("adding partitions succeeded!")
+    } catch {
+      case e =>
+        println("adding partitions failed because of " + e.getMessage)
+        println(Utils.stackTrace(e))
+    } finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
+    val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+    if (existingPartitionsReplicaList.size == 0)
+      throw new AdministrationException("The topic %s does not exist".format(topic))
+
+    val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get
+
+    // create the new partition replication list
+    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+    val newPartitionReplicaList = if (replicaAssignmentStr == "")
+      AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
+    else
+      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
+
+    // check if manual assignment has the right replication factor
+    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
+    if (unmatchedRepFactorList.size != 0)
+      throw new AdministrationException("The replication factor in manual replication assignment " +
+        " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
+
+    info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
+    val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
+    // add the new list
+    partitionReplicaList ++= newPartitionReplicaList
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true)
+  }
+
+  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
+    val partitionList = replicaAssignmentList.split(",")
+    val ret = new mutable.HashMap[Int, List[Int]]()
+    var partitionId = startPartitionId
+    for (i <- 0 until partitionList.size) {
+      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+      if (brokerList.size <= 0)
+        throw new AdministrationException("replication factor must be larger than 0")
+      if (brokerList.size != brokerList.toSet.size)
+        throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
+      if (!brokerList.toSet.subsetOf(availableBrokerList))
+        throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
+          "available broker:" + availableBrokerList.toString)
+      ret.put(partitionId, brokerList.toList)
+      if (ret(partitionId).size != ret(startPartitionId).size)
+        throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
+      partitionId = partitionId + 1
+    }
+    ret.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 41cb764..c399bc7 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -48,7 +48,7 @@ object AdminUtils extends Logging {
    * p7        p8        p9        p5        p6       (3nd replica)
    */
   def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
-                              fixedStartIndex: Int = -1)  // for testing only
+                              fixedStartIndex: Int = -1, startPartitionId: Int = -1)
   : Map[Int, Seq[Int]] = {
     if (nPartitions <= 0)
       throw new AdministrationException("number of partitions must be larger than 0")
@@ -59,25 +59,34 @@ object AdminUtils extends Logging {
         " larger than available brokers: " + brokerList.size)
     val ret = new mutable.HashMap[Int, List[Int]]()
     val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+    var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
 
-    var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
     for (i <- 0 until nPartitions) {
-      if (i > 0 && (i % brokerList.size == 0))
-        secondReplicaShift += 1
-      val firstReplicaIndex = (i + startIndex) % brokerList.size
+      if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
+        nextReplicaShift += 1
+      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
       var replicaList = List(brokerList(firstReplicaIndex))
       for (j <- 0 until replicationFactor - 1)
-        replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
-      ret.put(i, replicaList.reverse)
+        replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
+      ret.put(currentPartitionId, replicaList.reverse)
+      currentPartitionId = currentPartitionId + 1
     }
     ret.toMap
   }
 
-  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) {
+  def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient, update: Boolean = false) {
     try {
       val zkPath = ZkUtils.getTopicPath(topic)
       val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
-      ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
+
+      if (!update) {
+        info("Topic creation " + jsonPartitionData.toString)
+        ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
+      } else {
+        info("Topic update " + jsonPartitionData.toString)
+        ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData)
+      }
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
       case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index e762115..21c1186 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -93,7 +93,7 @@ object CreateTopicCommand extends Logging {
     else
       getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
     debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
   }
 
   def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5ac38fd..b07e27b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -215,6 +215,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       initializeControllerContext()
       replicaStateMachine.startup()
       partitionStateMachine.startup()
+      // register the partition change listeners for all existing topics on failover
+      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
       Utils.registerMBean(this, KafkaController.MBeanName)
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
       initializeAndMaybeTriggerPartitionReassignment()

http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index deebed0..0135d45 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 import kafka.utils.{Logging, ZkUtils}
-import org.I0Itec.zkclient.IZkChildListener
+import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.log4j.Logger
 
@@ -334,7 +334,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   def registerPartitionChangeListener(topic: String) = {
-    zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
+    zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic))
   }
 
   private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
@@ -383,15 +383,31 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
-  class PartitionChangeListener(topic: String) extends IZkChildListener with Logging {
-    this.logIdent = "[Controller " + controller.config.brokerId + "]: "
+
+  class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
+
+    this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
 
     @throws(classOf[Exception])
-    def handleChildChange(parentPath : String, children : java.util.List[String]) {
+    def handleDataChange(dataPath : String, data: Object) {
       controllerContext.controllerLock synchronized {
-        // TODO: To be completed as part of KAFKA-41
+        try {
+          info("Add Partition triggered " + data.toString + " for path " + dataPath)
+          val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+          val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p =>
+            !controllerContext.partitionReplicaAssignment.contains(p._1))
+          info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
+          controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
+        } catch {
+          case e => error("Error while handling add partitions for data path " + dataPath, e )
+        }
       }
     }
+
+    @throws(classOf[Exception])
+    def handleDataDeleted(parentPath : String) {
+      // this is not implemented for partition change
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
new file mode 100644
index 0000000..06be990
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import junit.framework.Assert._
+import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.cluster.Broker
+import kafka.client.ClientUtils
+import kafka.server.{KafkaConfig, KafkaServer}
+
+class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val brokerId1 = 0
+  val brokerId2 = 1
+  val brokerId3 = 2
+  val brokerId4 = 3
+
+  val port1 = TestUtils.choosePort()
+  val port2 = TestUtils.choosePort()
+  val port3 = TestUtils.choosePort()
+  val port4 = TestUtils.choosePort()
+
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+
+  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+  var brokers: Seq[Broker] = Seq.empty[Broker]
+
+  val partitionId = 0
+
+  val topic1 = "new-topic1"
+  val topic2 = "new-topic2"
+  val topic3 = "new-topic3"
+  val topic4 = "new-topic4"
+
+  override def setUp() {
+    super.setUp()
+    // start all the servers
+    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+    val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
+    val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
+
+    servers ++= List(server1, server2, server3, server4)
+    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
+
+    // create topics with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1")
+    CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2")
+    CreateTopicCommand.createTopic(zkClient, topic3, 1, 4, "2:3:0:1")
+    CreateTopicCommand.createTopic(zkClient, topic4, 1, 4, "0:3")
+
+
+    // wait until leader is elected
+    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500)
+    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId, 500)
+    var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId, 500)
+    var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId, 500)
+
+    debug("Leader for " + topic1  + " is elected to be: %s".format(leader1.getOrElse(-1)))
+    debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1)))
+    debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1)))
+    debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1)))
+
+    assertTrue("Leader should get elected", leader1.isDefined)
+    assertTrue("Leader should get elected", leader2.isDefined)
+    assertTrue("Leader should get elected", leader3.isDefined)
+    assertTrue("Leader should get elected", leader4.isDefined)
+
+    assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+    assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2))
+    assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3))
+    assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3))
+  }
+
+  override def tearDown() {
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDirs))
+    super.tearDown()
+  }
+
+  def testTopicDoesNotExist {
+    try {
+      AddPartitionsCommand.addPartitions(zkClient, "Blah", 1)
+      fail("Topic should not exist")
+    } catch {
+      case e: AdministrationException => //this is good
+      case e2 => throw e2
+    }
+  }
+
+  def testWrongReplicaCount {
+    try {
+      AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2")
+      fail("Add partitions should fail")
+    } catch {
+      case e: AdministrationException => //this is good
+      case e2 => throw e2
+    }
+  }
+
+  def testIncrementPartitions {
+    AddPartitionsCommand.addPartitions(zkClient, topic1, 2)
+    // wait until leader is elected
+    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500)
+    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500)
+    val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get
+    val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get
+    assertEquals(leader1.get, leader1FromZk)
+    assertEquals(leader2.get, leader2FromZk)
+
+    // read metadata from a broker and verify the new topic partitions exist
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 1000)
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions",
+      2000,0).topicsMetadata
+    val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
+    val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata
+    assertEquals(partitionDataForTopic1.size, 3)
+    assertEquals(partitionDataForTopic1(1).partitionId, 1)
+    assertEquals(partitionDataForTopic1(2).partitionId, 2)
+    val replicas = partitionDataForTopic1(1).replicas
+    assertEquals(replicas.size, 2)
+    assert(replicas.contains(partitionDataForTopic1(1).leader.get))
+  }
+
+  def testManualAssignmentOfReplicas {
+    AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3")
+    // wait until leader is elected
+    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500)
+    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500)
+    val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get
+    val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get
+    assertEquals(leader1.get, leader1FromZk)
+    assertEquals(leader2.get, leader2FromZk)
+
+    // read metadata from a broker and verify the new topic partitions exist
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 1000)
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas",
+      2000,0).topicsMetadata
+    val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2))
+    val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata
+    assertEquals(partitionDataForTopic2.size, 3)
+    assertEquals(partitionDataForTopic2(1).partitionId, 1)
+    assertEquals(partitionDataForTopic2(2).partitionId, 2)
+    val replicas = partitionDataForTopic2(1).replicas
+    assertEquals(replicas.size, 2)
+    assert(replicas(0).id == 0 || replicas(0).id == 1)
+    assert(replicas(1).id == 0 || replicas(1).id == 1)
+  }
+
+  def testReplicaPlacement {
+    AddPartitionsCommand.addPartitions(zkClient, topic3, 6)
+    // wait until leader is elected
+    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500)
+    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500)
+    var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 3, 500)
+    var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 4, 500)
+    var leader5 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 5, 500)
+    var leader6 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 6, 500)
+
+    val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 1).get
+    val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 2).get
+    val leader3FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 3).get
+    val leader4FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 4).get
+    val leader5FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 5).get
+    val leader6FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 6).get
+
+    assertEquals(leader1.get, leader1FromZk)
+    assertEquals(leader2.get, leader2FromZk)
+    assertEquals(leader3.get, leader3FromZk)
+    assertEquals(leader4.get, leader4FromZk)
+    assertEquals(leader5.get, leader5FromZk)
+    assertEquals(leader6.get, leader6FromZk)
+
+    // read metadata from a broker and verify the new topic partitions exist
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 1000)
+
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement",
+      2000,0).topicsMetadata
+
+    val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head
+    val partition1DataForTopic3 = metaDataForTopic3.partitionsMetadata(1)
+    val partition2DataForTopic3 = metaDataForTopic3.partitionsMetadata(2)
+    val partition3DataForTopic3 = metaDataForTopic3.partitionsMetadata(3)
+    val partition4DataForTopic3 = metaDataForTopic3.partitionsMetadata(4)
+    val partition5DataForTopic3 = metaDataForTopic3.partitionsMetadata(5)
+    val partition6DataForTopic3 = metaDataForTopic3.partitionsMetadata(6)
+
+    assertEquals(partition1DataForTopic3.replicas.size, 4)
+    assertEquals(partition1DataForTopic3.replicas(0).id, 3)
+    assertEquals(partition1DataForTopic3.replicas(1).id, 2)
+    assertEquals(partition1DataForTopic3.replicas(2).id, 0)
+    assertEquals(partition1DataForTopic3.replicas(3).id, 1)
+
+    assertEquals(partition2DataForTopic3.replicas.size, 4)
+    assertEquals(partition2DataForTopic3.replicas(0).id, 0)
+    assertEquals(partition2DataForTopic3.replicas(1).id, 3)
+    assertEquals(partition2DataForTopic3.replicas(2).id, 1)
+    assertEquals(partition2DataForTopic3.replicas(3).id, 2)
+
+    assertEquals(partition3DataForTopic3.replicas.size, 4)
+    assertEquals(partition3DataForTopic3.replicas(0).id, 1)
+    assertEquals(partition3DataForTopic3.replicas(1).id, 0)
+    assertEquals(partition3DataForTopic3.replicas(2).id, 2)
+    assertEquals(partition3DataForTopic3.replicas(3).id, 3)
+
+    assertEquals(partition4DataForTopic3.replicas.size, 4)
+    assertEquals(partition4DataForTopic3.replicas(0).id, 2)
+    assertEquals(partition4DataForTopic3.replicas(1).id, 3)
+    assertEquals(partition4DataForTopic3.replicas(2).id, 0)
+    assertEquals(partition4DataForTopic3.replicas(3).id, 1)
+
+    assertEquals(partition5DataForTopic3.replicas.size, 4)
+    assertEquals(partition5DataForTopic3.replicas(0).id, 3)
+    assertEquals(partition5DataForTopic3.replicas(1).id, 0)
+    assertEquals(partition5DataForTopic3.replicas(2).id, 1)
+    assertEquals(partition5DataForTopic3.replicas(3).id, 2)
+
+    assertEquals(partition6DataForTopic3.replicas.size, 4)
+    assertEquals(partition6DataForTopic3.replicas(0).id, 0)
+    assertEquals(partition6DataForTopic3.replicas(1).id, 1)
+    assertEquals(partition6DataForTopic3.replicas(2).id, 2)
+    assertEquals(partition6DataForTopic3.replicas(3).id, 3)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 0d8b70f..dc0013f 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -157,7 +157,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val topic = "test"
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
@@ -166,7 +166,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
 
     try {
-      AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
       fail("shouldn't be able to create a topic already exists")
     } catch {
       case e: TopicExistsException => // this is good
@@ -181,7 +181,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // reassign partition 0
     val newReplicas = Seq(0, 2, 3)
     val partitionToBeReassigned = 0
@@ -206,7 +206,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // reassign partition 0
     val newReplicas = Seq(1, 2, 3)
     val partitionToBeReassigned = 0
@@ -232,7 +232,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
@@ -273,7 +273,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // put the partition in the reassigned path as well
     // reassign partition 0
     val newReplicas = Seq(0, 1)
@@ -312,7 +312,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // broker 2 should be the leader since it was started first
     val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
@@ -333,7 +333,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000)