You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/01/14 00:48:28 UTC

svn commit: r1231400 - in /incubator/kafka/branches/0.8: bin/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/utils/ core/src/test/scala/unit/kafka/admin/

Author: junrao
Date: Fri Jan 13 23:48:27 2012
New Revision: 1231400

URL: http://svn.apache.org/viewvc?rev=1231400&view=rev
Log:
create/delete ZK path for a topic in an admin tool; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-237

Added:
    incubator/kafka/branches/0.8/bin/kafka-create-topic.sh
    incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh
    incubator/kafka/branches/0.8/bin/kafka-list-topic.sh
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala

Added: incubator/kafka/branches/0.8/bin/kafka-create-topic.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-create-topic.sh?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-create-topic.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-create-topic.sh Fri Jan 13 23:48:27 2012
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.CreateTopicCommand $@

Added: incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-delete-topic.sh Fri Jan 13 23:48:27 2012
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.DeleteTopicCommand $@

Added: incubator/kafka/branches/0.8/bin/kafka-list-topic.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-list-topic.sh?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-list-topic.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-list-topic.sh Fri Jan 13 23:48:27 2012
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.ListTopicCommand $@

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.util.Random
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.utils.{SystemTime, Utils, ZkUtils}
+
+object AdminUtils {
+  val rand = new Random
+
+  /**
+   * There are 2 goals of replica assignment:
+   * 1. Spread the replicas evenly among brokers.
+   * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
+   *
+   * To achieve this goal, we:
+   * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
+   * 2. Assign the remaining replicas of each partition with an increasing shift.
+   *
+   * Here is an example of assigning
+   * broker-0  broker-1  broker-2  broker-3  broker-4
+   * p0        p1        p2        p3        p4       (1st replica)
+   * p5        p6        p7        p8        p9       (1st replica)
+   * p4        p0        p1        p2        p3       (2nd replica)
+   * p8        p9        p5        p6        p7       (2nd replica)
+   * p3        p4        p0        p1        p2       (3nd replica)
+   * p7        p8        p9        p5        p6       (3nd replica)
+   */
+  def assginReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
+          fixedStartIndex: Int = -1)  // for testing only
+    : Array[List[String]] = {
+    if (nPartitions <= 0)
+      throw new AdministrationException("number of partitions must be larger than 0")
+    if (replicationFactor <= 0)
+      throw new AdministrationException("replication factor must be larger than 0")
+    if (replicationFactor > brokerList.size)
+      throw new AdministrationException("replication factor: " + replicationFactor +
+              " larger than available brokers: " + brokerList.size)
+    val ret = new Array[List[String]](nPartitions)
+    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+
+    var secondReplicaShift = -1
+    for (i <- 0 until nPartitions) {
+      if (i % brokerList.size == 0)
+        secondReplicaShift += 1
+      val firstReplicaIndex = (i + startIndex) % brokerList.size
+      var replicaList = List(brokerList(firstReplicaIndex))
+      for (j <- 0 until replicationFactor - 1)
+        replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
+      ret(i) = replicaList.reverse
+    }
+
+    ret
+  }
+
+  def createReplicaAssignmentPathInZK(topic: String, replicaAssignmentList: Seq[List[String]], zkClient: ZkClient) {
+    try {
+      val topicVersion = SystemTime.milliseconds
+      ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString)
+      for (i <- 0 until replicaAssignmentList.size) {
+        val zkPath = ZkUtils.getTopicPartReplicasPath(topic, i.toString)
+        ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
+      }
+    }
+    catch {
+      case e: ZkNodeExistsException =>
+        throw new AdministrationException("topic " + topic + " already exists, with version "
+          + ZkUtils.getTopicVersion (zkClient, topic))
+      case e2 =>
+        throw new AdministrationException(e2.toString)      
+    }
+  }
+
+  def getTopicMetaDataFromZK(topic: String, zkClient: ZkClient): Option[Seq[PartitionMetaData]] = {
+    if (!ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+      return None
+    
+    val topicPartitionsPath = ZkUtils.getTopicPartsPath(topic)
+    val partitions = ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).sortWith((s,t) => s.toInt < t.toInt)
+    val ret = new Array[PartitionMetaData](partitions.size)
+    for (i <-0 until ret.size) {
+      val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartReplicasPath(topic, partitions(i)))
+      val inSync = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartInSyncPath(topic, partitions(i)))
+      val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartLeaderPath(topic, partitions(i)))
+      ret(i) = new PartitionMetaData(partitions(i),
+                                     Utils.getCSVList(replicas).toList,
+                                     Utils.getCSVList(inSync).toList,
+                                     if (leader == null) None else Some(leader)
+                                    )
+    }
+    Some(ret)
+  }
+
+  private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
+    val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
+    (firstReplicaIndex + shift) % nBrokers
+  }
+}
+
+class AdministrationException(val errorMessage: String) extends RuntimeException(errorMessage) {
+  def this() = this(null)
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+
+object CreateTopicCommand {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be created.")
+                         .withRequiredArg
+                         .describedAs("topic")
+                         .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+                                      "Multiple URLS can be given to allow fail-over.")
+                           .withRequiredArg
+                           .describedAs("urls")
+                           .ofType(classOf[String])
+    val nPartitionsOpt = parser.accepts("partition", "number of partitions in the topic")
+                           .withRequiredArg
+                           .describedAs("# of partitions")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1)
+    val replicationFactorOpt = parser.accepts("replica", "replication factor for each partitions in the topic")
+                           .withRequiredArg
+                           .describedAs("replication factor")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1)
+    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manuallly assigning replicas to brokers")
+                           .withRequiredArg
+                           .describedAs("broker_id_for_part1_replica1:broker_id_for_part1_replica2,broker_id_for_part2_replica1:broker_id_for_part2_replica2, ...")
+                           .ofType(classOf[String])
+                           .defaultsTo("")
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(topicOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topic = options.valueOf(topicOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val nPartitions = options.valueOf(nPartitionsOpt).intValue
+    val replicationFactor = options.valueOf(replicationFactorOpt).intValue
+    val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+      var replicaAssignment: Seq[List[String]] = null
+
+      if (replicaAssignmentStr == "")
+        replicaAssignment = AdminUtils.assginReplicasToBrokers(brokerList, nPartitions, replicationFactor)
+      else
+        replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
+      AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+      println("creation succeeded!")
+    }
+    catch {
+      case e =>
+        println("creation failed because of " + e.getMessage)
+        println(Utils.stackTrace(e))
+    }
+    finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = {
+    val partitionList = replicaAssignmentList.split(",")
+    val ret = new Array[List[String]](partitionList.size)
+    for (i <- 0 until partitionList.size) {
+      val brokerList = partitionList(i).split(":")
+      if (brokerList.size <= 0)
+        throw new AdministrationException("replication factor must be larger than 0")
+      if (brokerList.size != brokerList.toSet.size)
+        throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
+      if (!brokerList.toSet.subsetOf(availableBrokerList))
+        throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList +
+                "available broker:" + availableBrokerList)
+      ret(i) = brokerList.toList
+      if (ret(i).size != ret(0).size)
+        throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
+    }
+    ret
+  }
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
+
+object DeleteTopicCommand {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.")
+                         .withRequiredArg
+                         .describedAs("topic")
+                         .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+                                      "Multiple URLS can be given to allow fail-over.")
+                           .withRequiredArg
+                           .describedAs("urls")
+                           .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(topicOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topic = options.valueOf(topicOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
+      println("deletion succeeded!")
+    }
+    catch {
+      case e =>
+        println("delection failed because of " + e.getMessage)
+        println(Utils.stackTrace(e))
+    }
+    finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
+
+object ListTopicCommand {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.")
+                         .withRequiredArg
+                         .describedAs("topic")
+                         .ofType(classOf[String])
+                         .defaultsTo("")
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+                                      "Multiple URLS can be given to allow fail-over.")
+                           .withRequiredArg
+                           .describedAs("urls")
+                           .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topic = options.valueOf(topicOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    var zkClient: ZkClient = null
+    try {
+      var topicList: Seq[String] = Nil
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+
+      if (topic == "")
+        topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath)
+      else
+        topicList = List(topic)
+
+      if (topicList.size <= 0)
+        println("no topics exist!")
+
+      for (t <- topicList)
+        showTopic(t, zkClient)
+    }
+    catch {
+      case e =>
+        println("list topic failed because of " + e.getMessage)
+        println(Utils.stackTrace(e))
+    }
+    finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def showTopic(topic: String, zkClient: ZkClient) {
+    val topicMetaData = AdminUtils.getTopicMetaDataFromZK(topic, zkClient)
+    topicMetaData match {
+      case None =>
+        println("topic " + topic + " doesn't exist!")
+      case Some(tmd) =>
+        println("topic: " + topic)
+        for (part <- tmd)
+          println(part.toString)
+    }
+  }
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+class PartitionMetaData(val partitionId: String,
+                        val replicaList: Seq[String],
+                        val inSyncList: Seq[String],
+                        val leaderId: Option[String]) {
+
+  override def toString(): String = {
+    val builder = new StringBuilder
+    builder.append("partition id: " + partitionId)
+    builder.append(" replica list: " + replicaList.mkString(","))
+    builder.append(" in-sync list: " + inSyncList.mkString(","))
+    builder.append(" leader: " + leaderId)
+    builder.toString
+  }
+
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1231400&r1=1231399&r2=1231400&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Fri Jan 13 23:48:27 2012
@@ -553,6 +553,16 @@ object Utils extends Logging {
     }
   }
 
+  def seqToCSV(seq: Seq[String]): String = {
+    var csvString = ""
+    for (i <- 0 until seq.size) {
+      if (i > 0)
+        csvString = csvString + ','
+      csvString = csvString + seq(i)
+    }
+    csvString
+  }
+
   def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
     val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
     val successMsg =  "The retention hour for "

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1231400&r1=1231399&r2=1231400&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Fri Jan 13 23:48:27 2012
@@ -29,6 +29,38 @@ object ZkUtils extends Logging {
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
 
+  def getTopicPath(topic: String): String ={
+    BrokerTopicsPath + "/" + topic
+  }
+
+  def getTopicPartsPath(topic: String): String ={
+    getTopicPath(topic) + "/" + "partitions"
+  }
+
+  def getTopicPartPath(topic: String, partitionId: String): String ={
+    getTopicPartsPath(topic) + "/" + partitionId
+  }
+
+  def getTopicVersion(zkClient: ZkClient, topic: String): String ={
+    readDataMaybeNull(zkClient, getTopicPath(topic))
+  }
+
+  def getTopicPartReplicasPath(topic: String, partitionId: String): String ={
+    getTopicPartPath(topic, partitionId) + "/" + "replicas"
+  }
+
+  def getTopicPartInSyncPath(topic: String, partitionId: String): String ={
+    getTopicPartPath(topic, partitionId) + "/" + "isr"
+  }
+
+  def getTopicPartLeaderPath(topic: String, partitionId: String): String ={
+    getTopicPartPath(topic, partitionId) + "/" + "leader"
+  }
+
+  def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
+      ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+  }
+
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
@@ -94,6 +126,21 @@ object ZkUtils extends Logging {
   }
 
   /**
+   * Create an persistent node with the given path and data. Create parents if necessary.
+   */
+  def createPersistentPath(client: ZkClient, path: String, data: String): Unit = {
+    try {
+      client.createPersistent(path, data)
+    }
+    catch {
+      case e: ZkNoNodeException => {
+        createParentPath(client, path)
+        client.createPersistent(path, data)
+      }
+    }
+  }
+
+  /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
    */

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1231400&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Fri Jan 13 23:48:27 2012
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import junit.framework.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestZKUtils
+
+class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val zkConnect = TestZKUtils.zookeeperConnect
+ 
+  @Test
+  def testReplicaAssignment() {
+    val brokerList = List("0", "1", "2", "3", "4")
+
+    // test 0 replication factor
+    try {
+      AdminUtils.assginReplicasToBrokers(brokerList, 10, 0)
+      fail("shouldn't allow replication factor 0")
+    }
+    catch {
+      case e: AdministrationException => // this is good
+      case e2 => throw e2
+    }
+
+    // test wrong replication factor
+    try {
+      AdminUtils.assginReplicasToBrokers(brokerList, 10, 6)
+      fail("shouldn't allow replication factor larger than # of brokers")
+    }
+    catch {
+      case e: AdministrationException => // this is good
+      case e2 => throw e2
+    }
+
+    // correct assignment
+    {
+      val expectedAssignment = Array(
+        List("0", "1", "2"),
+        List("1", "2", "3"),
+        List("2", "3", "4"),
+        List("3", "4", "0"),
+        List("4", "0", "1"),
+        List("0", "2", "3"),
+        List("1", "3", "4"),
+        List("2", "4", "0"),
+        List("3", "0", "1"),
+        List("4", "1", "2")
+        )
+
+      val actualAssignment = AdminUtils.assginReplicasToBrokers(brokerList, 10, 3, 0)
+      val e = (expectedAssignment.toList == actualAssignment.toList)
+      assertTrue(expectedAssignment.toList == actualAssignment.toList)
+    }
+  }
+
+  @Test
+  def testManualReplicaAssignment() {
+    val brokerList = Set("0", "1", "2", "3", "4")
+
+    // duplicated brokers
+    try {
+      val replicationAssignmentStr = "0,0,1:1,2,3"
+      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
+      fail("replication assginment shouldn't have duplicated brokers")
+    }
+    catch {
+      case e: AdministrationException => // this is good
+      case e2 => throw e2
+    }
+
+    // non-exist brokers
+    try {
+      val replicationAssignmentStr = "0,1,2:1,2,7"
+      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
+      fail("replication assginment shouldn't contain non-exist brokers")
+    }
+    catch {
+      case e: AdministrationException => // this is good
+      case e2 => throw e2
+    }
+
+    // inconsistent replication factor
+    try {
+      val replicationAssignmentStr = "0,1,2:1,2"
+      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
+      fail("all partitions should have the same replication factor")
+    }
+    catch {
+      case e: AdministrationException => // this is good
+      case e2 => throw e2
+    }
+
+    // good assignment
+    {
+      val replicationAssignmentStr = "0:1:2,1:2:3"
+      val expectedReplicationAssignment = Array(
+        List("0", "1", "2"),
+        List("1", "2", "3")
+      )
+      val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
+      assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
+    }
+  }
+
+  @Test
+  def testTopicCreationInZK() {
+    val expectedReplicationAssignment = Array(
+      List("0", "1", "2"),
+      List("1", "2", "3"),
+      List("2", "3", "4"),
+      List("3", "4", "0"),
+      List("4", "0", "1"),
+      List("0", "2", "3"),
+      List("1", "3", "4"),
+      List("2", "4", "0"),
+      List("3", "0", "1"),
+      List("4", "1", "2"),
+      List("1", "2", "3"),
+      List("1", "3", "4")      
+      )
+    val topic = "test"
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client)
+    val actualReplicationAssignment = AdminUtils.getTopicMetaDataFromZK(topic, zookeeper.client).get.map(p => p.replicaList)
+    assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
+
+    try {
+      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client)
+      fail("shouldn't be able to create a topic already exist")
+    }
+    catch {
+      case e: AdministrationException => // this is good
+      case e2 => throw e2
+    }
+  }
+}