You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:48 UTC
[03/36] git commit: kafka-347;
change number of partitions of a topic online; patched by Sriram Subramanian;
reviewed by Neha Narkehede, Guozhang Wang, Joel Koshy and Jun Rao
kafka-347; change number of partitions of a topic online; patched by Sriram Subramanian; reviewed by Neha Narkehede, Guozhang Wang, Joel Koshy and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3817857b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3817857b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3817857b
Branch: refs/heads/trunk
Commit: 3817857b15f9ce03f10a9730f0ff4619d728b06f
Parents: b1891e7
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Mon Jul 22 21:33:28 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jul 22 21:33:28 2013 -0700
----------------------------------------------------------------------
bin/kafka-add-partitions.sh | 19 ++
.../kafka/admin/AddPartitionsCommand.scala | 127 ++++++++++
.../src/main/scala/kafka/admin/AdminUtils.scala | 27 +-
.../scala/kafka/admin/CreateTopicCommand.scala | 2 +-
.../kafka/controller/KafkaController.scala | 2 +
.../controller/PartitionStateMachine.scala | 28 ++-
.../unit/kafka/admin/AddPartitionsTest.scala | 251 +++++++++++++++++++
.../test/scala/unit/kafka/admin/AdminTest.scala | 16 +-
8 files changed, 448 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/bin/kafka-add-partitions.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-add-partitions.sh b/bin/kafka-add-partitions.sh
new file mode 100755
index 0000000..7d217e2
--- /dev/null
+++ b/bin/kafka-add-partitions.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.AddPartitionsCommand $@
http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
new file mode 100644
index 0000000..5757c32
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import scala.collection.mutable
+import kafka.common.TopicAndPartition
+
+object AddPartitionsCommand extends Logging {
+
+ def main(args: Array[String]): Unit = {
+ val parser = new OptionParser
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need to be added.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+ val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add to the topic")
+ .withRequiredArg
+ .describedAs("# of partitions")
+ .ofType(classOf[java.lang.Integer])
+ val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers for the new partitions")
+ .withRequiredArg
+ .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
+ "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
+ .ofType(classOf[String])
+ .defaultsTo("")
+
+ val options = parser.parse(args : _*)
+
+ for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("***Please note that this tool can only be used to add partitions when data for a topic does not use a key.***\n" +
+ "Missing required argument. " + " \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val topic = options.valueOf(topicOpt)
+ val zkConnect = options.valueOf(zkConnectOpt)
+ val nPartitions = options.valueOf(nPartitionsOpt).intValue
+ val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
+ var zkClient: ZkClient = null
+ try {
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+ println("adding partitions succeeded!")
+ } catch {
+ case e =>
+ println("adding partitions failed because of " + e.getMessage)
+ println(Utils.stackTrace(e))
+ } finally {
+ if (zkClient != null)
+ zkClient.close()
+ }
+ }
+
+ def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
+ val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+ if (existingPartitionsReplicaList.size == 0)
+ throw new AdministrationException("The topic %s does not exist".format(topic))
+
+ val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get
+
+ // create the new partition replication list
+ val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+ val newPartitionReplicaList = if (replicaAssignmentStr == "")
+ AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
+ else
+ getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
+
+ // check if manual assignment has the right replication factor
+ val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
+ if (unmatchedRepFactorList.size != 0)
+ throw new AdministrationException("The replication factor in manual replication assignment " +
+ " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
+
+ info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
+ val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
+ // add the new list
+ partitionReplicaList ++= newPartitionReplicaList
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true)
+ }
+
+ def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
+ val partitionList = replicaAssignmentList.split(",")
+ val ret = new mutable.HashMap[Int, List[Int]]()
+ var partitionId = startPartitionId
+ for (i <- 0 until partitionList.size) {
+ val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+ if (brokerList.size <= 0)
+ throw new AdministrationException("replication factor must be larger than 0")
+ if (brokerList.size != brokerList.toSet.size)
+ throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
+ if (!brokerList.toSet.subsetOf(availableBrokerList))
+ throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
+ "available broker:" + availableBrokerList.toString)
+ ret.put(partitionId, brokerList.toList)
+ if (ret(partitionId).size != ret(startPartitionId).size)
+ throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
+ partitionId = partitionId + 1
+ }
+ ret.toMap
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 41cb764..c399bc7 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -48,7 +48,7 @@ object AdminUtils extends Logging {
* p7 p8 p9 p5 p6 (3nd replica)
*/
def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
- fixedStartIndex: Int = -1) // for testing only
+ fixedStartIndex: Int = -1, startPartitionId: Int = -1)
: Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdministrationException("number of partitions must be larger than 0")
@@ -59,25 +59,34 @@ object AdminUtils extends Logging {
" larger than available brokers: " + brokerList.size)
val ret = new mutable.HashMap[Int, List[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+ var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
- var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+ var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
- if (i > 0 && (i % brokerList.size == 0))
- secondReplicaShift += 1
- val firstReplicaIndex = (i + startIndex) % brokerList.size
+ if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
+ nextReplicaShift += 1
+ val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
- replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
- ret.put(i, replicaList.reverse)
+ replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
+ ret.put(currentPartitionId, replicaList.reverse)
+ currentPartitionId = currentPartitionId + 1
}
ret.toMap
}
- def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) {
+ def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient, update: Boolean = false) {
try {
val zkPath = ZkUtils.getTopicPath(topic)
val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
- ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
+
+ if (!update) {
+ info("Topic creation " + jsonPartitionData.toString)
+ ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
+ } else {
+ info("Topic update " + jsonPartitionData.toString)
+ ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData)
+ }
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index e762115..21c1186 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -93,7 +93,7 @@ object CreateTopicCommand extends Logging {
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5ac38fd..b07e27b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -215,6 +215,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
+ // register the partition change listeners for all existing topics on failover
+ controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
Utils.registerMBean(this, KafkaController.MBeanName)
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
initializeAndMaybeTriggerPartitionReassignment()
http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index deebed0..0135d45 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.utils.{Logging, ZkUtils}
-import org.I0Itec.zkclient.IZkChildListener
+import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.log4j.Logger
@@ -334,7 +334,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
def registerPartitionChangeListener(topic: String) = {
- zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
+ zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic))
}
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
@@ -383,15 +383,31 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
}
- class PartitionChangeListener(topic: String) extends IZkChildListener with Logging {
- this.logIdent = "[Controller " + controller.config.brokerId + "]: "
+
+ class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
+
+ this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
@throws(classOf[Exception])
- def handleChildChange(parentPath : String, children : java.util.List[String]) {
+ def handleDataChange(dataPath : String, data: Object) {
controllerContext.controllerLock synchronized {
- // TODO: To be completed as part of KAFKA-41
+ try {
+ info("Add Partition triggered " + data.toString + " for path " + dataPath)
+ val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+ val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p =>
+ !controllerContext.partitionReplicaAssignment.contains(p._1))
+ info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
+ controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
+ } catch {
+ case e => error("Error while handling add partitions for data path " + dataPath, e )
+ }
}
}
+
+ @throws(classOf[Exception])
+ def handleDataDeleted(parentPath : String) {
+ // this is not implemented for partition change
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
new file mode 100644
index 0000000..06be990
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import junit.framework.Assert._
+import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.cluster.Broker
+import kafka.client.ClientUtils
+import kafka.server.{KafkaConfig, KafkaServer}
+
+class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val brokerId1 = 0
+ val brokerId2 = 1
+ val brokerId3 = 2
+ val brokerId4 = 3
+
+ val port1 = TestUtils.choosePort()
+ val port2 = TestUtils.choosePort()
+ val port3 = TestUtils.choosePort()
+ val port4 = TestUtils.choosePort()
+
+ val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
+ val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+
+ var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+ var brokers: Seq[Broker] = Seq.empty[Broker]
+
+ val partitionId = 0
+
+ val topic1 = "new-topic1"
+ val topic2 = "new-topic2"
+ val topic3 = "new-topic3"
+ val topic4 = "new-topic4"
+
+ override def setUp() {
+ super.setUp()
+ // start all the servers
+ val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+ val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+ val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
+ val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
+
+ servers ++= List(server1, server2, server3, server4)
+ brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
+
+ // create topics with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1")
+ CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2")
+ CreateTopicCommand.createTopic(zkClient, topic3, 1, 4, "2:3:0:1")
+ CreateTopicCommand.createTopic(zkClient, topic4, 1, 4, "0:3")
+
+
+ // wait until leader is elected
+ var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500)
+ var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId, 500)
+ var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId, 500)
+ var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId, 500)
+
+ debug("Leader for " + topic1 + " is elected to be: %s".format(leader1.getOrElse(-1)))
+ debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1)))
+ debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1)))
+ debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1)))
+
+ assertTrue("Leader should get elected", leader1.isDefined)
+ assertTrue("Leader should get elected", leader2.isDefined)
+ assertTrue("Leader should get elected", leader3.isDefined)
+ assertTrue("Leader should get elected", leader4.isDefined)
+
+ assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+ assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2))
+ assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3))
+ assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3))
+ }
+
+ override def tearDown() {
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDirs))
+ super.tearDown()
+ }
+
+ def testTopicDoesNotExist {
+ try {
+ AddPartitionsCommand.addPartitions(zkClient, "Blah", 1)
+ fail("Topic should not exist")
+ } catch {
+ case e: AdministrationException => //this is good
+ case e2 => throw e2
+ }
+ }
+
+ def testWrongReplicaCount {
+ try {
+ AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2")
+ fail("Add partitions should fail")
+ } catch {
+ case e: AdministrationException => //this is good
+ case e2 => throw e2
+ }
+ }
+
+ def testIncrementPartitions {
+ AddPartitionsCommand.addPartitions(zkClient, topic1, 2)
+ // wait until leader is elected
+ var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500)
+ var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500)
+ val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get
+ val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get
+ assertEquals(leader1.get, leader1FromZk)
+ assertEquals(leader2.get, leader2FromZk)
+
+ // read metadata from a broker and verify the new topic partitions exist
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 1000)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 1000)
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions",
+ 2000,0).topicsMetadata
+ val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
+ val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata
+ assertEquals(partitionDataForTopic1.size, 3)
+ assertEquals(partitionDataForTopic1(1).partitionId, 1)
+ assertEquals(partitionDataForTopic1(2).partitionId, 2)
+ val replicas = partitionDataForTopic1(1).replicas
+ assertEquals(replicas.size, 2)
+ assert(replicas.contains(partitionDataForTopic1(1).leader.get))
+ }
+
+ def testManualAssignmentOfReplicas {
+ AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3")
+ // wait until leader is elected
+ var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500)
+ var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500)
+ val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get
+ val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get
+ assertEquals(leader1.get, leader1FromZk)
+ assertEquals(leader2.get, leader2FromZk)
+
+ // read metadata from a broker and verify the new topic partitions exist
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 1000)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 1000)
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas",
+ 2000,0).topicsMetadata
+ val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2))
+ val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata
+ assertEquals(partitionDataForTopic2.size, 3)
+ assertEquals(partitionDataForTopic2(1).partitionId, 1)
+ assertEquals(partitionDataForTopic2(2).partitionId, 2)
+ val replicas = partitionDataForTopic2(1).replicas
+ assertEquals(replicas.size, 2)
+ assert(replicas(0).id == 0 || replicas(0).id == 1)
+ assert(replicas(1).id == 0 || replicas(1).id == 1)
+ }
+
+ def testReplicaPlacement {
+ AddPartitionsCommand.addPartitions(zkClient, topic3, 6)
+ // wait until leader is elected
+ var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500)
+ var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500)
+ var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 3, 500)
+ var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 4, 500)
+ var leader5 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 5, 500)
+ var leader6 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 6, 500)
+
+ val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 1).get
+ val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 2).get
+ val leader3FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 3).get
+ val leader4FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 4).get
+ val leader5FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 5).get
+ val leader6FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 6).get
+
+ assertEquals(leader1.get, leader1FromZk)
+ assertEquals(leader2.get, leader2FromZk)
+ assertEquals(leader3.get, leader3FromZk)
+ assertEquals(leader4.get, leader4FromZk)
+ assertEquals(leader5.get, leader5FromZk)
+ assertEquals(leader6.get, leader6FromZk)
+
+ // read metadata from a broker and verify the new topic partitions exist
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 1000)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 1000)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 1000)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 1000)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 1000)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 1000)
+
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement",
+ 2000,0).topicsMetadata
+
+ val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head
+ val partition1DataForTopic3 = metaDataForTopic3.partitionsMetadata(1)
+ val partition2DataForTopic3 = metaDataForTopic3.partitionsMetadata(2)
+ val partition3DataForTopic3 = metaDataForTopic3.partitionsMetadata(3)
+ val partition4DataForTopic3 = metaDataForTopic3.partitionsMetadata(4)
+ val partition5DataForTopic3 = metaDataForTopic3.partitionsMetadata(5)
+ val partition6DataForTopic3 = metaDataForTopic3.partitionsMetadata(6)
+
+ assertEquals(partition1DataForTopic3.replicas.size, 4)
+ assertEquals(partition1DataForTopic3.replicas(0).id, 3)
+ assertEquals(partition1DataForTopic3.replicas(1).id, 2)
+ assertEquals(partition1DataForTopic3.replicas(2).id, 0)
+ assertEquals(partition1DataForTopic3.replicas(3).id, 1)
+
+ assertEquals(partition2DataForTopic3.replicas.size, 4)
+ assertEquals(partition2DataForTopic3.replicas(0).id, 0)
+ assertEquals(partition2DataForTopic3.replicas(1).id, 3)
+ assertEquals(partition2DataForTopic3.replicas(2).id, 1)
+ assertEquals(partition2DataForTopic3.replicas(3).id, 2)
+
+ assertEquals(partition3DataForTopic3.replicas.size, 4)
+ assertEquals(partition3DataForTopic3.replicas(0).id, 1)
+ assertEquals(partition3DataForTopic3.replicas(1).id, 0)
+ assertEquals(partition3DataForTopic3.replicas(2).id, 2)
+ assertEquals(partition3DataForTopic3.replicas(3).id, 3)
+
+ assertEquals(partition4DataForTopic3.replicas.size, 4)
+ assertEquals(partition4DataForTopic3.replicas(0).id, 2)
+ assertEquals(partition4DataForTopic3.replicas(1).id, 3)
+ assertEquals(partition4DataForTopic3.replicas(2).id, 0)
+ assertEquals(partition4DataForTopic3.replicas(3).id, 1)
+
+ assertEquals(partition5DataForTopic3.replicas.size, 4)
+ assertEquals(partition5DataForTopic3.replicas(0).id, 3)
+ assertEquals(partition5DataForTopic3.replicas(1).id, 0)
+ assertEquals(partition5DataForTopic3.replicas(2).id, 1)
+ assertEquals(partition5DataForTopic3.replicas(3).id, 2)
+
+ assertEquals(partition6DataForTopic3.replicas.size, 4)
+ assertEquals(partition6DataForTopic3.replicas(0).id, 0)
+ assertEquals(partition6DataForTopic3.replicas(1).id, 1)
+ assertEquals(partition6DataForTopic3.replicas(2).id, 2)
+ assertEquals(partition6DataForTopic3.replicas(3).id, 3)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 0d8b70f..dc0013f 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -157,7 +157,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val topic = "test"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
@@ -166,7 +166,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
try {
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
fail("shouldn't be able to create a topic already exists")
} catch {
case e: TopicExistsException => // this is good
@@ -181,7 +181,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
// reassign partition 0
val newReplicas = Seq(0, 2, 3)
val partitionToBeReassigned = 0
@@ -206,7 +206,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
// reassign partition 0
val newReplicas = Seq(1, 2, 3)
val partitionToBeReassigned = 0
@@ -232,7 +232,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
// reassign partition 0
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
@@ -273,7 +273,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1))
val topic = "test"
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
// put the partition in the reassigned path as well
// reassign partition 0
val newReplicas = Seq(0, 1)
@@ -312,7 +312,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
@@ -333,7 +333,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000)