You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/01/14 00:48:28 UTC
svn commit: r1231400 - in /incubator/kafka/branches/0.8: bin/
core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/utils/
core/src/test/scala/unit/kafka/admin/
Author: junrao
Date: Fri Jan 13 23:48:27 2012
New Revision: 1231400
URL: http://svn.apache.org/viewvc?rev=1231400&view=rev
Log:
create/delete ZK path for a topic in an admin tool; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-237
Added:
incubator/kafka/branches/0.8/bin/kafka-create-topic.sh
incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh
incubator/kafka/branches/0.8/bin/kafka-list-topic.sh
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
Added: incubator/kafka/branches/0.8/bin/kafka-create-topic.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-create-topic.sh?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-create-topic.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-create-topic.sh Fri Jan 13 23:48:27 2012
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.CreateTopicCommand $@
Added: incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh Fri Jan 13 23:48:27 2012
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.DeleteTopicCommand $@
Added: incubator/kafka/branches/0.8/bin/kafka-list-topic.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-list-topic.sh?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-list-topic.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-list-topic.sh Fri Jan 13 23:48:27 2012
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.ListTopicCommand $@
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.util.Random
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.utils.{SystemTime, Utils, ZkUtils}
+
+object AdminUtils {
+ val rand = new Random
+
+ /**
+ * There are 2 goals of replica assignment:
+ * 1. Spread the replicas evenly among brokers.
+ * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
+ *
+ * To achieve this goal, we:
+ * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
+ * 2. Assign the remaining replicas of each partition with an increasing shift.
+ *
+ * Here is an example of assigning
+ * broker-0 broker-1 broker-2 broker-3 broker-4
+ * p0 p1 p2 p3 p4 (1st replica)
+ * p5 p6 p7 p8 p9 (1st replica)
+ * p4 p0 p1 p2 p3 (2nd replica)
+ * p8 p9 p5 p6 p7 (2nd replica)
+ * p3 p4 p0 p1 p2 (3nd replica)
+ * p7 p8 p9 p5 p6 (3nd replica)
+ */
+ def assginReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
+ fixedStartIndex: Int = -1) // for testing only
+ : Array[List[String]] = {
+ if (nPartitions <= 0)
+ throw new AdministrationException("number of partitions must be larger than 0")
+ if (replicationFactor <= 0)
+ throw new AdministrationException("replication factor must be larger than 0")
+ if (replicationFactor > brokerList.size)
+ throw new AdministrationException("replication factor: " + replicationFactor +
+ " larger than available brokers: " + brokerList.size)
+ val ret = new Array[List[String]](nPartitions)
+ val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+
+ var secondReplicaShift = -1
+ for (i <- 0 until nPartitions) {
+ if (i % brokerList.size == 0)
+ secondReplicaShift += 1
+ val firstReplicaIndex = (i + startIndex) % brokerList.size
+ var replicaList = List(brokerList(firstReplicaIndex))
+ for (j <- 0 until replicationFactor - 1)
+ replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
+ ret(i) = replicaList.reverse
+ }
+
+ ret
+ }
+
+ def createReplicaAssignmentPathInZK(topic: String, replicaAssignmentList: Seq[List[String]], zkClient: ZkClient) {
+ try {
+ val topicVersion = SystemTime.milliseconds
+ ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString)
+ for (i <- 0 until replicaAssignmentList.size) {
+ val zkPath = ZkUtils.getTopicPartReplicasPath(topic, i.toString)
+ ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
+ }
+ }
+ catch {
+ case e: ZkNodeExistsException =>
+ throw new AdministrationException("topic " + topic + " already exists, with version "
+ + ZkUtils.getTopicVersion (zkClient, topic))
+ case e2 =>
+ throw new AdministrationException(e2.toString)
+ }
+ }
+
+ def getTopicMetaDataFromZK(topic: String, zkClient: ZkClient): Option[Seq[PartitionMetaData]] = {
+ if (!ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+ return None
+
+ val topicPartitionsPath = ZkUtils.getTopicPartsPath(topic)
+ val partitions = ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).sortWith((s,t) => s.toInt < t.toInt)
+ val ret = new Array[PartitionMetaData](partitions.size)
+ for (i <-0 until ret.size) {
+ val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartReplicasPath(topic, partitions(i)))
+ val inSync = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartInSyncPath(topic, partitions(i)))
+ val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartLeaderPath(topic, partitions(i)))
+ ret(i) = new PartitionMetaData(partitions(i),
+ Utils.getCSVList(replicas).toList,
+ Utils.getCSVList(inSync).toList,
+ if (leader == null) None else Some(leader)
+ )
+ }
+ Some(ret)
+ }
+
+ private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
+ val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
+ (firstReplicaIndex + shift) % nBrokers
+ }
+}
+
+class AdministrationException(val errorMessage: String) extends RuntimeException(errorMessage) {
+ def this() = this(null)
+}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+
+object CreateTopicCommand {
+
+ def main(args: Array[String]): Unit = {
+ val parser = new OptionParser
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be created.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+ val nPartitionsOpt = parser.accepts("partition", "number of partitions in the topic")
+ .withRequiredArg
+ .describedAs("# of partitions")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+ val replicationFactorOpt = parser.accepts("replica", "replication factor for each partitions in the topic")
+ .withRequiredArg
+ .describedAs("replication factor")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+ val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manuallly assigning replicas to brokers")
+ .withRequiredArg
+ .describedAs("broker_id_for_part1_replica1:broker_id_for_part1_replica2,broker_id_for_part2_replica1:broker_id_for_part2_replica2, ...")
+ .ofType(classOf[String])
+ .defaultsTo("")
+
+ val options = parser.parse(args : _*)
+
+ for(arg <- List(topicOpt, zkConnectOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val topic = options.valueOf(topicOpt)
+ val zkConnect = options.valueOf(zkConnectOpt)
+ val nPartitions = options.valueOf(nPartitionsOpt).intValue
+ val replicationFactor = options.valueOf(replicationFactorOpt).intValue
+ val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
+ var zkClient: ZkClient = null
+ try {
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+ var replicaAssignment: Seq[List[String]] = null
+
+ if (replicaAssignmentStr == "")
+ replicaAssignment = AdminUtils.assginReplicasToBrokers(brokerList, nPartitions, replicationFactor)
+ else
+ replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
+ AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+ println("creation succeeded!")
+ }
+ catch {
+ case e =>
+ println("creation failed because of " + e.getMessage)
+ println(Utils.stackTrace(e))
+ }
+ finally {
+ if (zkClient != null)
+ zkClient.close()
+ }
+ }
+
+ def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = {
+ val partitionList = replicaAssignmentList.split(",")
+ val ret = new Array[List[String]](partitionList.size)
+ for (i <- 0 until partitionList.size) {
+ val brokerList = partitionList(i).split(":")
+ if (brokerList.size <= 0)
+ throw new AdministrationException("replication factor must be larger than 0")
+ if (brokerList.size != brokerList.toSet.size)
+ throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
+ if (!brokerList.toSet.subsetOf(availableBrokerList))
+ throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList +
+ "available broker:" + availableBrokerList)
+ ret(i) = brokerList.toList
+ if (ret(i).size != ret(0).size)
+ throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
+ }
+ ret
+ }
+}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
+
+object DeleteTopicCommand {
+
+ def main(args: Array[String]): Unit = {
+ val parser = new OptionParser
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+
+ val options = parser.parse(args : _*)
+
+ for(arg <- List(topicOpt, zkConnectOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val topic = options.valueOf(topicOpt)
+ val zkConnect = options.valueOf(zkConnectOpt)
+ var zkClient: ZkClient = null
+ try {
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
+ println("deletion succeeded!")
+ }
+ catch {
+ case e =>
+ println("delection failed because of " + e.getMessage)
+ println(Utils.stackTrace(e))
+ }
+ finally {
+ if (zkClient != null)
+ zkClient.close()
+ }
+ }
+}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
+
+object ListTopicCommand {
+
+ def main(args: Array[String]): Unit = {
+ val parser = new OptionParser
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ .defaultsTo("")
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+
+ val options = parser.parse(args : _*)
+
+ for(arg <- List(zkConnectOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val topic = options.valueOf(topicOpt)
+ val zkConnect = options.valueOf(zkConnectOpt)
+ var zkClient: ZkClient = null
+ try {
+ var topicList: Seq[String] = Nil
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+
+ if (topic == "")
+ topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath)
+ else
+ topicList = List(topic)
+
+ if (topicList.size <= 0)
+ println("no topics exist!")
+
+ for (t <- topicList)
+ showTopic(t, zkClient)
+ }
+ catch {
+ case e =>
+ println("list topic failed because of " + e.getMessage)
+ println(Utils.stackTrace(e))
+ }
+ finally {
+ if (zkClient != null)
+ zkClient.close()
+ }
+ }
+
+ def showTopic(topic: String, zkClient: ZkClient) {
+ val topicMetaData = AdminUtils.getTopicMetaDataFromZK(topic, zkClient)
+ topicMetaData match {
+ case None =>
+ println("topic " + topic + " doesn't exist!")
+ case Some(tmd) =>
+ println("topic: " + topic)
+ for (part <- tmd)
+ println(part.toString)
+ }
+ }
+}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+class PartitionMetaData(val partitionId: String,
+ val replicaList: Seq[String],
+ val inSyncList: Seq[String],
+ val leaderId: Option[String]) {
+
+ override def toString(): String = {
+ val builder = new StringBuilder
+ builder.append("partition id: " + partitionId)
+ builder.append(" replica list: " + replicaList.mkString(","))
+ builder.append(" in-sync list: " + inSyncList.mkString(","))
+ builder.append(" leader: " + leaderId)
+ builder.toString
+ }
+
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1231400&r1=1231399&r2=1231400&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Fri Jan 13 23:48:27 2012
@@ -553,6 +553,16 @@ object Utils extends Logging {
}
}
+ def seqToCSV(seq: Seq[String]): String = {
+ var csvString = ""
+ for (i <- 0 until seq.size) {
+ if (i > 0)
+ csvString = csvString + ','
+ csvString = csvString + seq(i)
+ }
+ csvString
+ }
+
def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
val successMsg = "The retention hour for "
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1231400&r1=1231399&r2=1231400&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Fri Jan 13 23:48:27 2012
@@ -29,6 +29,38 @@ object ZkUtils extends Logging {
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
+ def getTopicPath(topic: String): String ={
+ BrokerTopicsPath + "/" + topic
+ }
+
+ def getTopicPartsPath(topic: String): String ={
+ getTopicPath(topic) + "/" + "partitions"
+ }
+
+ def getTopicPartPath(topic: String, partitionId: String): String ={
+ getTopicPartsPath(topic) + "/" + partitionId
+ }
+
+ def getTopicVersion(zkClient: ZkClient, topic: String): String ={
+ readDataMaybeNull(zkClient, getTopicPath(topic))
+ }
+
+ def getTopicPartReplicasPath(topic: String, partitionId: String): String ={
+ getTopicPartPath(topic, partitionId) + "/" + "replicas"
+ }
+
+ def getTopicPartInSyncPath(topic: String, partitionId: String): String ={
+ getTopicPartPath(topic, partitionId) + "/" + "isr"
+ }
+
+ def getTopicPartLeaderPath(topic: String, partitionId: String): String ={
+ getTopicPartPath(topic, partitionId) + "/" + "leader"
+ }
+
+ def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
+ ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+ }
+
/**
* make sure a persistent path exists in ZK. Create the path if not exist.
*/
@@ -94,6 +126,21 @@ object ZkUtils extends Logging {
}
/**
+ * Create an persistent node with the given path and data. Create parents if necessary.
+ */
+ def createPersistentPath(client: ZkClient, path: String, data: String): Unit = {
+ try {
+ client.createPersistent(path, data)
+ }
+ catch {
+ case e: ZkNoNodeException => {
+ createParentPath(client, path)
+ client.createPersistent(path, data)
+ }
+ }
+ }
+
+ /**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
*/
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import junit.framework.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestZKUtils
+
+class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val zkConnect = TestZKUtils.zookeeperConnect
+
+ @Test
+ def testReplicaAssignment() {
+ val brokerList = List("0", "1", "2", "3", "4")
+
+ // test 0 replication factor
+ try {
+ AdminUtils.assginReplicasToBrokers(brokerList, 10, 0)
+ fail("shouldn't allow replication factor 0")
+ }
+ catch {
+ case e: AdministrationException => // this is good
+ case e2 => throw e2
+ }
+
+ // test wrong replication factor
+ try {
+ AdminUtils.assginReplicasToBrokers(brokerList, 10, 6)
+ fail("shouldn't allow replication factor larger than # of brokers")
+ }
+ catch {
+ case e: AdministrationException => // this is good
+ case e2 => throw e2
+ }
+
+ // correct assignment
+ {
+ val expectedAssignment = Array(
+ List("0", "1", "2"),
+ List("1", "2", "3"),
+ List("2", "3", "4"),
+ List("3", "4", "0"),
+ List("4", "0", "1"),
+ List("0", "2", "3"),
+ List("1", "3", "4"),
+ List("2", "4", "0"),
+ List("3", "0", "1"),
+ List("4", "1", "2")
+ )
+
+ val actualAssignment = AdminUtils.assginReplicasToBrokers(brokerList, 10, 3, 0)
+ val e = (expectedAssignment.toList == actualAssignment.toList)
+ assertTrue(expectedAssignment.toList == actualAssignment.toList)
+ }
+ }
+
+ @Test
+ def testManualReplicaAssignment() {
+ val brokerList = Set("0", "1", "2", "3", "4")
+
+ // duplicated brokers
+ try {
+ val replicationAssignmentStr = "0,0,1:1,2,3"
+ CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
+ fail("replication assginment shouldn't have duplicated brokers")
+ }
+ catch {
+ case e: AdministrationException => // this is good
+ case e2 => throw e2
+ }
+
+ // non-exist brokers
+ try {
+ val replicationAssignmentStr = "0,1,2:1,2,7"
+ CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
+ fail("replication assginment shouldn't contain non-exist brokers")
+ }
+ catch {
+ case e: AdministrationException => // this is good
+ case e2 => throw e2
+ }
+
+ // inconsistent replication factor
+ try {
+ val replicationAssignmentStr = "0,1,2:1,2"
+ CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
+ fail("all partitions should have the same replication factor")
+ }
+ catch {
+ case e: AdministrationException => // this is good
+ case e2 => throw e2
+ }
+
+ // good assignment
+ {
+ val replicationAssignmentStr = "0:1:2,1:2:3"
+ val expectedReplicationAssignment = Array(
+ List("0", "1", "2"),
+ List("1", "2", "3")
+ )
+ val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
+ assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
+ }
+ }
+
+ @Test
+ def testTopicCreationInZK() {
+ val expectedReplicationAssignment = Array(
+ List("0", "1", "2"),
+ List("1", "2", "3"),
+ List("2", "3", "4"),
+ List("3", "4", "0"),
+ List("4", "0", "1"),
+ List("0", "2", "3"),
+ List("1", "3", "4"),
+ List("2", "4", "0"),
+ List("3", "0", "1"),
+ List("4", "1", "2"),
+ List("1", "2", "3"),
+ List("1", "3", "4")
+ )
+ val topic = "test"
+ AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client)
+ val actualReplicationAssignment = AdminUtils.getTopicMetaDataFromZK(topic, zookeeper.client).get.map(p => p.replicaList)
+ assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
+
+ try {
+ AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client)
+ fail("shouldn't be able to create a topic already exist")
+ }
+ catch {
+ case e: AdministrationException => // this is good
+ case e2 => throw e2
+ }
+ }
+}