You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/01/24 21:54:13 UTC

svn commit: r1235492 [1/2] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/...

Author: nehanarkhede
Date: Tue Jan 24 20:54:11 2012
New Revision: 1235492

URL: http://svn.apache.org/viewvc?rev=1235492&view=rev
Log:
KAFKA-238 getTopicMetadata API; patched by nehanarkhede; reviewed by junrao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
    incubator/kafka/branches/0.8/project/build.properties

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Tue Jan 24 20:54:11 2012
@@ -20,9 +20,12 @@ package kafka.admin
 import java.util.Random
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import kafka.utils.{SystemTime, Utils, ZkUtils}
+import kafka.api.{TopicMetadata, PartitionMetadata}
+import kafka.utils.{Logging, SystemTime, Utils, ZkUtils}
+import kafka.cluster.Broker
+import collection.mutable.HashMap
 
-object AdminUtils {
+object AdminUtils extends Logging {
   val rand = new Random
 
   /**
@@ -43,7 +46,7 @@ object AdminUtils {
    * p3        p4        p0        p1        p2       (3nd replica)
    * p7        p8        p9        p5        p6       (3nd replica)
    */
-  def assginReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
+  def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
           fixedStartIndex: Int = -1)  // for testing only
     : Array[List[String]] = {
     if (nPartitions <= 0)
@@ -75,8 +78,9 @@ object AdminUtils {
       val topicVersion = SystemTime.milliseconds
       ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString)
       for (i <- 0 until replicaAssignmentList.size) {
-        val zkPath = ZkUtils.getTopicPartReplicasPath(topic, i.toString)
+        val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
         ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
+        debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
       }
     }
     catch {
@@ -88,24 +92,47 @@ object AdminUtils {
     }
   }
 
-  def getTopicMetaDataFromZK(topic: String, zkClient: ZkClient): Option[Seq[PartitionMetaData]] = {
-    if (!ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
-      return None
-    
-    val topicPartitionsPath = ZkUtils.getTopicPartsPath(topic)
-    val partitions = ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).sortWith((s,t) => s.toInt < t.toInt)
-    val ret = new Array[PartitionMetaData](partitions.size)
-    for (i <-0 until ret.size) {
-      val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartReplicasPath(topic, partitions(i)))
-      val inSync = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartInSyncPath(topic, partitions(i)))
-      val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartLeaderPath(topic, partitions(i)))
-      ret(i) = new PartitionMetaData(partitions(i),
-                                     Utils.getCSVList(replicas).toList,
-                                     Utils.getCSVList(inSync).toList,
-                                     if (leader == null) None else Some(leader)
-                                    )
+  def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[Option[TopicMetadata]] = {
+    val cachedBrokerInfo = new HashMap[Int, Broker]()
+
+    val metadataList = topics.map { topic =>
+      if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
+        val partitions = ZkUtils.getSortedPartitionIdsForTopic(zkClient, topic)
+        val partitionMetadata = new Array[PartitionMetadata](partitions.size)
+
+        for (i <-0 until partitionMetadata.size) {
+          val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
+          val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString))
+          val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitions(i).toString))
+          debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
+
+          partitionMetadata(i) = new PartitionMetadata(partitions(i),
+            if (leader == null) None else Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(leader.toInt)).head),
+            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
+            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)),
+            None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+        }
+        Some(new TopicMetadata(topic, partitionMetadata))
+      } else
+        None
+    }
+
+    metadataList.toList
+  }
+
+  private def getBrokerInfoFromCache(zkClient: ZkClient,
+                                     cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
+                                     brokerIds: Seq[Int]): Seq[Broker] = {
+    brokerIds.map { id =>
+      val optionalBrokerInfo = cachedBrokerInfo.get(id)
+      optionalBrokerInfo match {
+        case Some(brokerInfo) => brokerInfo // return broker info from the cache
+        case None => // fetch it from zookeeper
+          val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
+          cachedBrokerInfo += (id -> brokerInfo)
+          brokerInfo
+      }
     }
-    Some(ret)
   }
 
   private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Tue Jan 24 20:54:11 2012
@@ -44,9 +44,10 @@ object CreateTopicCommand {
                            .describedAs("replication factor")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1)
-    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manuallly assigning replicas to brokers")
+    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers")
                            .withRequiredArg
-                           .describedAs("broker_id_for_part1_replica1:broker_id_for_part1_replica2,broker_id_for_part2_replica1:broker_id_for_part2_replica2, ...")
+                           .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
+                                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
                            .ofType(classOf[String])
                            .defaultsTo("")
 
@@ -68,14 +69,7 @@ object CreateTopicCommand {
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      val brokerList = ZkUtils.getSortedBrokerList(zkClient)
-      var replicaAssignment: Seq[List[String]] = null
-
-      if (replicaAssignmentStr == "")
-        replicaAssignment = AdminUtils.assginReplicasToBrokers(brokerList, nPartitions, replicationFactor)
-      else
-        replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
-      AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
       println("creation succeeded!")
     }
     catch {
@@ -89,11 +83,22 @@ object CreateTopicCommand {
     }
   }
 
+  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+    var replicaAssignment: Seq[List[String]] = null
+
+    if (replicaAssignmentStr == "")
+      replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
+    else
+      replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
+    AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+  }
+
   def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = {
     val partitionList = replicaAssignmentList.split(",")
     val ret = new Array[List[String]](partitionList.size)
     for (i <- 0 until partitionList.size) {
-      val brokerList = partitionList(i).split(":")
+      val brokerList = partitionList(i).split(":").map(s => s.trim())
       if (brokerList.size <= 0)
         throw new AdministrationException("replication factor must be larger than 0")
       if (brokerList.size != brokerList.toSet.size)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala Tue Jan 24 20:54:11 2012
@@ -76,13 +76,13 @@ object ListTopicCommand {
   }
 
   def showTopic(topic: String, zkClient: ZkClient) {
-    val topicMetaData = AdminUtils.getTopicMetaDataFromZK(topic, zkClient)
+    val topicMetaData = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
     topicMetaData match {
       case None =>
         println("topic " + topic + " doesn't exist!")
       case Some(tmd) =>
         println("topic: " + topic)
-        for (part <- tmd)
+        for (part <- tmd.partitionsMetadata)
           println(part.toString)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Tue Jan 24 20:54:11 2012
@@ -38,7 +38,7 @@ class FetchRequest(val topic: String,
                    val maxSize: Int) extends Request(RequestKeys.Fetch) {
   
   def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic, "UTF-8")
+    Utils.writeShortString(buffer, topic)
     buffer.putInt(partition)
     buffer.putLong(offset)
     buffer.putInt(maxSize)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala Tue Jan 24 20:54:11 2012
@@ -19,8 +19,6 @@ package kafka.api
 
 import java.nio._
 import kafka.network._
-import kafka.utils._
-import kafka.api._
 
 object MultiFetchRequest {
   def readFrom(buffer: ByteBuffer): MultiFetchRequest = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Tue Jan 24 20:54:11 2012
@@ -62,7 +62,7 @@ class OffsetRequest(val topic: String,
                     val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
 
   def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic, "UTF-8")
+    Utils.writeShortString(buffer, topic)
     buffer.putInt(partition)
     buffer.putLong(time)
     buffer.putInt(maxNumOffsets)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Tue Jan 24 20:54:11 2012
@@ -41,7 +41,7 @@ class ProducerRequest(val topic: String,
                       val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
 
   def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic, "UTF-8")
+    Utils.writeShortString(buffer, topic)
     buffer.putInt(partition)
     buffer.putInt(messages.serialized.limit)
     buffer.put(messages.serialized)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala Tue Jan 24 20:54:11 2012
@@ -23,4 +23,5 @@ object RequestKeys {
   val MultiFetch: Short = 2
   val MultiProduce: Short = 3
   val Offsets: Short = 4
+  val TopicMetadata: Short = 5
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala?rev=1235492&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala Tue Jan 24 20:54:11 2012
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.api
+
+import kafka.cluster.Broker
+import java.nio.ByteBuffer
+import kafka.utils.Utils._
+import collection.mutable.ListBuffer
+
+/**
+ * topic (2 bytes + topic.length)
+ * number of partitions (4 bytes)
+ *
+ * partition id (4 bytes)
+ *
+ * does leader exist (1 byte)
+ * leader info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ * number of replicas (2 bytes)
+ * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ * number of in sync replicas (2 bytes)
+ * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ *
+ * does log metadata exist (1 byte)
+ * number of log segments (4 bytes)
+ * total size of log in bytes (8 bytes)
+ *
+ * number of log segments (4 bytes)
+ * beginning offset (8 bytes)
+ * last modified timestamp (8 bytes)
+ * size of log segment (8 bytes)
+ *
+ */
+
+sealed trait LeaderRequest { def requestId: Byte }
+case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 }
+case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 }
+
+sealed trait LogSegmentMetadataRequest { def requestId: Byte }
+case object LogSegmentMetadataExists extends LogSegmentMetadataRequest { val requestId: Byte = 1 }
+case object LogSegmentMetadataDoesNotExist extends LogSegmentMetadataRequest { val requestId: Byte = 0 }
+
+object TopicMetadata {
+
+  def readFrom(buffer: ByteBuffer): TopicMetadata = {
+    val topic = readShortString(buffer)
+    val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
+    val partitionsMetadata = new ListBuffer[PartitionMetadata]()
+    for(i <- 0 until numPartitions)
+      partitionsMetadata += PartitionMetadata.readFrom(buffer)
+    new TopicMetadata(topic, partitionsMetadata)
+  }
+}
+
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata]) {
+  def sizeInBytes: Int = {
+    var size: Int = shortStringLength(topic)
+    size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes)
+    debug("Size of topic metadata = " + size)
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    /* topic */
+    writeShortString(buffer, topic)
+    /* number of partitions */
+    buffer.putInt(partitionsMetadata.size)
+    partitionsMetadata.foreach(m => m.writeTo(buffer))
+  }
+}
+
+object PartitionMetadata {
+
+  def readFrom(buffer: ByteBuffer): PartitionMetadata = {
+    val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
+    val doesLeaderExist = getLeaderRequest(buffer.get)
+    val leader = doesLeaderExist match {
+      case LeaderExists => /* leader exists */
+        Some(Broker.readFrom(buffer))
+      case LeaderDoesNotExist => None
+    }
+
+    /* list of all replicas */
+    val numReplicas = getShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
+    val replicas = new Array[Broker](numReplicas)
+    for(i <- 0 until numReplicas) {
+      replicas(i) = Broker.readFrom(buffer)
+    }
+
+    /* list of in-sync replicas */
+    val numIsr = getShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
+    val isr = new Array[Broker](numIsr)
+    for(i <- 0 until numIsr) {
+      isr(i) = Broker.readFrom(buffer)
+    }
+
+    val doesLogMetadataExist = getLogSegmentMetadataRequest(buffer.get)
+    val logMetadata = doesLogMetadataExist match {
+      case LogSegmentMetadataExists =>
+        val numLogSegments = getIntInRange(buffer, "total number of log segments", (0, Int.MaxValue))
+        val totalDataSize = getLongInRange(buffer, "total data size", (0, Long.MaxValue))
+        val numSegmentMetadata = getIntInRange(buffer, "number of log segment metadata", (0, Int.MaxValue))
+        val segmentMetadata = numSegmentMetadata match {
+          case 0 => None
+          case _ =>
+            val metadata = new ListBuffer[LogSegmentMetadata]()
+            for(i <- 0 until numSegmentMetadata) {
+              val beginningOffset = getLongInRange(buffer, "beginning offset", (0, Long.MaxValue))
+              val lastModified = getLongInRange(buffer, "last modified time", (0, Long.MaxValue))
+              val size = getLongInRange(buffer, "size of log segment", (0, Long.MaxValue))
+              metadata += new LogSegmentMetadata(beginningOffset, lastModified, size)
+            }
+            Some(metadata)
+        }
+        Some(new LogMetadata(numLogSegments, totalDataSize, segmentMetadata))
+      case LogSegmentMetadataDoesNotExist => None
+    }
+    new PartitionMetadata(partitionId, leader, replicas, isr, logMetadata)
+  }
+
+  def getLeaderRequest(requestId: Byte): LeaderRequest = {
+    requestId match {
+      case LeaderExists.requestId => LeaderExists
+      case LeaderDoesNotExist.requestId => LeaderDoesNotExist
+      case _ => throw new IllegalArgumentException("Unknown leader request id " + requestId)
+    }
+  }
+
+  def getLogSegmentMetadataRequest(requestId: Byte): LogSegmentMetadataRequest = {
+    requestId match {
+      case LogSegmentMetadataExists.requestId => LogSegmentMetadataExists
+      case LogSegmentMetadataDoesNotExist.requestId => LogSegmentMetadataDoesNotExist
+    }
+  }
+}
+
+case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker],
+                             logMetadata: Option[LogMetadata]) {
+  def sizeInBytes: Int = {
+    var size: Int = 4 /* partition id */ + 1 /* if leader exists*/
+
+    leader match {
+      case Some(l) => size += l.sizeInBytes
+      case None =>
+    }
+
+    size += 2 /* number of replicas */
+    size += replicas.foldLeft(0)(_ + _.sizeInBytes)
+    size += 2 /* number of in sync replicas */
+    size += isr.foldLeft(0)(_ + _.sizeInBytes)
+
+    size += 1 /* if log segment metadata exists */
+    logMetadata match {
+      case Some(metadata) => size += metadata.sizeInBytes
+      case None =>
+    }
+    debug("Size of partition metadata = " + size)
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(partitionId)
+
+    /* if leader exists*/
+    leader match {
+      case Some(l) =>
+        buffer.put(LeaderExists.requestId)
+        /* leader id host_name port */
+        l.writeTo(buffer)
+      case None => buffer.put(LeaderDoesNotExist.requestId)
+    }
+
+    /* number of replicas */
+    buffer.putShort(replicas.size.toShort)
+    replicas.foreach(r => r.writeTo(buffer))
+
+    /* number of in-sync replicas */
+    buffer.putShort(isr.size.toShort)
+    isr.foreach(r => r.writeTo(buffer))
+
+    /* if log segment metadata exists */
+    logMetadata match {
+      case Some(metadata) =>
+        buffer.put(LogSegmentMetadataExists.requestId)
+        metadata.writeTo(buffer)
+      case None => buffer.put(LogSegmentMetadataDoesNotExist.requestId)
+    }
+
+  }
+}
+
+case class LogMetadata(numLogSegments: Int, totalSize: Long, logSegmentMetadata: Option[Seq[LogSegmentMetadata]]) {
+  def sizeInBytes: Int = {
+    var size: Int = 4 /* num log segments */ + 8 /* total data size */ + 4 /* number of log segment metadata */
+    logSegmentMetadata match {
+      case Some(segmentMetadata) => size += segmentMetadata.foldLeft(0)(_ + _.sizeInBytes)
+      case None =>
+    }
+    debug("Size of log metadata = " + size)
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(numLogSegments)
+    buffer.putLong(totalSize)
+    /* if segment metadata exists */
+    logSegmentMetadata match {
+      case Some(segmentMetadata) =>
+        /* number of log segments */
+        buffer.putInt(segmentMetadata.size)
+        segmentMetadata.foreach(m => m.writeTo(buffer))
+      case None =>
+        buffer.putInt(0)
+    }
+  }
+}
+
+case class LogSegmentMetadata(beginningOffset: Long, lastModified: Long, size: Long) {
+  def sizeInBytes: Int = {
+    8 /* beginning offset */ + 8 /* last modified timestamp */ + 8 /* log segment size in bytes */
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putLong(beginningOffset)
+    buffer.putLong(lastModified)
+    buffer.putLong(size)
+  }
+}
+
+

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1235492&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Tue Jan 24 20:54:11 2012
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.utils.Utils._
+import kafka.network.{Send, Request}
+import java.nio.channels.GatheringByteChannel
+import kafka.common.ErrorMapping
+import collection.mutable.ListBuffer
+
+sealed trait DetailedMetadataRequest { def requestId: Short }
+case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
+case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] }
+
+object TopicMetadataRequest {
+
+  /**
+   * TopicMetadataRequest has the following format -
+   *
+   * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes)
+   *
+   * The detailedMetadata field is a placeholder for requesting various details about partition and log metadata
+   * By default, the value for this field is 0, which means it will just return leader, replica and ISR metadata for
+   * all partitions of the list of topics mentioned in the request.
+   */
+  def getDetailedMetadataRequest(requestId: Short): DetailedMetadataRequest = {
+    requestId match {
+      case SegmentMetadata.requestId => SegmentMetadata
+      case NoSegmentMetadata.requestId => NoSegmentMetadata
+      case _ => throw new IllegalArgumentException("Unknown detailed metadata request id " + requestId)
+    }
+  }
+
+  def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
+    val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
+    val topics = new ListBuffer[String]()
+    for(i <- 0 until numTopics)
+      topics += readShortString(buffer, "UTF-8")
+    val topicsList = topics.toList
+    val returnDetailedMetadata = getDetailedMetadataRequest(buffer.getShort)
+    var timestamp: Option[Long] = None
+    var count: Option[Int] = None
+    returnDetailedMetadata match {
+      case NoSegmentMetadata =>
+      case SegmentMetadata =>
+        timestamp = Some(buffer.getLong)
+        count = Some(buffer.getInt)
+      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request "
+                                                    + returnDetailedMetadata.requestId)
+    }
+    debug("topic = %s, detailed metadata request = %d"
+          .format(topicsList.head, returnDetailedMetadata.requestId))
+    new TopicMetadataRequest(topics.toList, returnDetailedMetadata, timestamp, count)
+  }
+
+  def serializeTopicMetadata(topicMetadata: Seq[TopicMetadata]): ByteBuffer = {
+    val size = topicMetadata.foldLeft(4 /* num topics */)(_ + _.sizeInBytes)
+    val buffer = ByteBuffer.allocate(size)
+    /* number of topics */
+    buffer.putInt(topicMetadata.size)
+    /* topic partition_metadata */
+    topicMetadata.foreach(m => m.writeTo(buffer))
+    buffer.rewind()
+    buffer
+  }
+
+  def deserializeTopicsMetadataResponse(buffer: ByteBuffer): Seq[TopicMetadata] = {
+    /* number of topics */
+    val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
+    val topicMetadata = new Array[TopicMetadata](numTopics)
+    for(i <- 0 until  numTopics)
+      topicMetadata(i) = TopicMetadata.readFrom(buffer)
+    topicMetadata
+  }
+}
+
+case class TopicMetadataRequest(val topics: Seq[String],
+                                val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
+                                val timestamp: Option[Long] = None, val count: Option[Int] = None)
+  extends Request(RequestKeys.TopicMetadata){
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(topics.size)
+    topics.foreach(topic => writeShortString(buffer, topic))
+    buffer.putShort(detailedMetadata.requestId)
+    detailedMetadata match {
+      case SegmentMetadata =>
+        buffer.putLong(timestamp.get)
+        buffer.putInt(count.get)
+      case NoSegmentMetadata =>
+      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
+    }
+  }
+
+  def sizeInBytes(): Int = {
+    var size: Int = 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
+                    2 /* detailed metadata */
+    detailedMetadata match {
+      case SegmentMetadata =>
+        size += 8 /* timestamp */ + 4 /* count */
+      case NoSegmentMetadata =>
+      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
+    }
+    size
+  }
+}
+
+class TopicMetadataSend(topicsMetadata: Seq[TopicMetadata]) extends Send {
+  private var size: Int = topicsMetadata.foldLeft(0)(_ + _.sizeInBytes)
+  private val header = ByteBuffer.allocate(6)
+  val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata)
+  header.putInt(size + 2)
+  header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
+  header.rewind()
+
+  var complete: Boolean = false
+
+  def writeTo(channel: GatheringByteChannel): Int = {
+    expectIncomplete()
+    var written = 0
+    if(header.hasRemaining)
+      written += channel.write(header)
+    if(!header.hasRemaining && metadata.hasRemaining)
+      written += channel.write(metadata)
+
+    if(!metadata.hasRemaining)
+      complete = true
+    written
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Tue Jan 24 20:54:11 2012
@@ -17,28 +17,49 @@
 
 package kafka.cluster
 
-import java.util.Arrays
-import kafka.utils._
-import java.net.InetAddress
-import kafka.server.KafkaConfig
-import util.parsing.json.JSON
+import kafka.utils.Utils._
+import java.nio.ByteBuffer
 
 /**
  * A Kafka broker
  */
 private[kafka] object Broker {
+
   def createBroker(id: Int, brokerInfoString: String): Broker = {
+    if(brokerInfoString == null)
+      throw new IllegalArgumentException("Broker id %s does not exist".format(id))
     val brokerInfo = brokerInfoString.split(":")
     new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
   }
+
+  def readFrom(buffer: ByteBuffer): Broker = {
+    val id = buffer.getInt
+    val creatorId = readShortString(buffer)
+    val host = readShortString(buffer)
+    val port = buffer.getInt
+    new Broker(id, creatorId, host, port)
+  }
 }
 
-private[kafka] class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
+private[kafka] case class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
   
   override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
 
   def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
-  
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(id)
+    writeShortString(buffer, creatorId)
+    writeShortString(buffer, host)
+    buffer.putInt(port)
+  }
+
+  def sizeInBytes: Int = {
+    val size = shortStringLength(creatorId) + shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
+    debug("Size of broker info = " + size)
+    size
+  }
+
   override def equals(obj: Any): Boolean = {
     obj match {
       case null => false
@@ -47,6 +68,6 @@ private[kafka] class Broker(val id: Int,
     }
   }
   
-  override def hashCode(): Int = Utils.hashcode(id, host, port)
+  override def hashCode(): Int = hashcode(id, host, port)
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
 
 package kafka.cluster
 
-import kafka.utils._
 import scala.collection._
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
 
 package kafka.common
 
-import kafka.consumer._
 import kafka.message.InvalidMessageException
 import java.nio.ByteBuffer
 import java.lang.Throwable

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala?rev=1235492&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala Tue Jan 24 20:54:11 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.common
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZKStringSerializer, ZKConfig}
+import java.util.concurrent.atomic.AtomicReference
+
+object KafkaZookeeperClient {
+  private val INSTANCE = new AtomicReference[ZkClient](null)
+
+  def getZookeeperClient(config: ZKConfig): ZkClient = {
+    // TODO: This cannot be a singleton since unit tests break if we do that
+//    INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+//                                              ZKStringSerializer))
+    INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+                                              ZKStringSerializer))
+    INSTANCE.get()
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Jan 24 20:54:11 2012
@@ -21,13 +21,11 @@ import scala.collection.mutable._
 import scala.collection.JavaConversions._
 import org.I0Itec.zkclient._
 import joptsimple._
-import java.util.Arrays.asList
 import java.util.Properties
 import java.util.Random
 import java.io.PrintStream
 import kafka.message._
 import kafka.utils.{Utils, Logging}
-import kafka.utils.ZkUtils
 import kafka.utils.ZKStringSerializer
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Tue Jan 24 20:54:11 2012
@@ -18,7 +18,7 @@
 package kafka.consumer
 
 import scala.collection.JavaConversions._
-import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.server.KafkaServerStartable

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala Tue Jan 24 20:54:11 2012
@@ -54,8 +54,7 @@ class OracleOffsetStorage(val connection
     } finally {
       commitOrRollback(connection, success)
     }
-    if(logger.isDebugEnabled)
-      logger.debug("Updated node " + node + " for topic '" + topic + "' to " + offset)
+    debug("Updated node " + node + " for topic '" + topic + "' to " + offset)
   }
   
   def close() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Tue Jan 24 20:54:11 2012
@@ -16,10 +16,8 @@
 */
 package kafka.javaapi
 
-import java.nio.ByteBuffer
 import kafka.serializer.Encoder
-import kafka.producer.{ProducerConfig, ProducerPool}
-import kafka.producer.async.{AsyncProducerConfig, QueueItem}
+import kafka.producer.async.QueueItem
 import kafka.utils.Logging
 
 private[javaapi] object Implicits extends Logging {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Tue Jan 24 20:54:11 2012
@@ -17,8 +17,7 @@
 
 package kafka.javaapi.message
 
-import java.nio.channels._
-import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
+import kafka.message.{MessageAndOffset, InvalidMessageException}
 
 /**
  * A set of messages. A message set has a fixed serialized form, though the container

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala Tue Jan 24 20:54:11 2012
@@ -54,7 +54,6 @@ class Producer[K,V](config: ProducerConf
    * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
    * object in the  send API
    */
-  import kafka.javaapi.Implicits._
   def this(config: ProducerConfig,
            encoder: Encoder[V],
            eventHandler: kafka.javaapi.producer.async.EventHandler[V],

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Jan 24 20:54:11 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLat
 import kafka.server.{KafkaConfig, KafkaZooKeeper}
 import kafka.common.{InvalidTopicException, InvalidPartitionException}
 import kafka.api.OffsetRequest
+import org.I0Itec.zkclient.ZkClient
 
 /**
  * The guy who creates and hands out logs
@@ -38,15 +39,16 @@ private[kafka] class LogManager(val conf
                                 needRecovery: Boolean) extends Logging {
   
   val logDir: File = new File(config.logDir)
+  var kafkaZookeeper = new KafkaZooKeeper(config, this)
+
   private val numPartitions = config.numPartitions
   private val maxSize: Long = config.logFileSize
   private val flushInterval = config.flushInterval
   private val topicPartitionsMap = config.topicPartitionsMap
   private val logCreationLock = new Object
   private val random = new java.util.Random
-  private var kafkaZookeeper: KafkaZooKeeper = null
   private var zkActor: Actor = null
-  private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
+  private val startupLatch: CountDownLatch = new CountDownLatch(1)
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
   private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
@@ -82,29 +84,26 @@ private[kafka] class LogManager(val conf
     scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
   }
 
-  if(config.enableZookeeper) {
-    kafkaZookeeper = new KafkaZooKeeper(config, this)
-    kafkaZookeeper.startup
-    zkActor = new Actor {
-      def act() {
-        loop {
-          receive {
-            case topic: String =>
-              try {
-                kafkaZookeeper.registerTopicInZk(topic)
-              }
-              catch {
-                case e => error(e) // log it and let it go
-              }
-            case StopActor =>
-              info("zkActor stopped")
-              exit
-          }
+  kafkaZookeeper.startup
+  zkActor = new Actor {
+    def act() {
+      loop {
+        receive {
+          case topic: String =>
+            try {
+              kafkaZookeeper.registerTopicInZk(topic)
+            }
+            catch {
+              case e => error(e) // log it and let it go
+            }
+          case StopActor =>
+            info("zkActor stopped")
+            exit
         }
       }
     }
-    zkActor.start
   }
+  zkActor.start
 
   case object StopActor
 
@@ -119,24 +118,20 @@ private[kafka] class LogManager(val conf
    *  Register this broker in ZK for the first time.
    */
   def startup() {
-    if(config.enableZookeeper) {
-      kafkaZookeeper.registerBrokerInZk()
-      for (topic <- getAllTopics)
-        kafkaZookeeper.registerTopicInZk(topic)
-      startupLatch.countDown
-    }
+    kafkaZookeeper.registerBrokerInZk()
+    for (topic <- getAllTopics)
+      kafkaZookeeper.registerTopicInZk(topic)
+    startupLatch.countDown
     info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
     logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
   }
 
   private def awaitStartup() {
-    if (config.enableZookeeper)
-      startupLatch.await
+    startupLatch.await
   }
 
   private def registerNewTopicInZK(topic: String) {
-    if (config.enableZookeeper)
-      zkActor ! topic 
+    zkActor ! topic
   }
 
   /**
@@ -288,10 +283,8 @@ private[kafka] class LogManager(val conf
     val iter = getLogIterator
     while(iter.hasNext)
       iter.next.close()
-    if (config.enableZookeeper) {
-      zkActor ! StopActor
-      kafkaZookeeper.close
-    }
+    zkActor ! StopActor
+    kafkaZookeeper.close
   }
   
   private def getLogIterator(): Iterator[Log] = {
@@ -345,4 +338,7 @@ private[kafka] class LogManager(val conf
 
   def getAllTopics(): Iterator[String] = logs.keys.iterator
   def getTopicPartitionsMap() = topicPartitionsMap
+
+  def getServerConfig: KafkaConfig = config
+  def getZookeeperClient: ZkClient = kafkaZookeeper.zkClient
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,6 @@ package kafka.message
 
 import java.io.InputStream
 import java.nio.ByteBuffer
-import scala.Math
 
 class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
   override def read():Int  = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
 
 package kafka.message
 
-import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.common.{InvalidMessageSizeException, ErrorMapping}
 import java.nio.ByteBuffer

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala Tue Jan 24 20:54:11 2012
@@ -54,8 +54,8 @@ class GZIPCompression(inputStream: Input
 }
 
 class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream)  extends CompressionFacade(inputStream,outputStream) {
-  import org.xerial.snappy.{SnappyInputStream}
-  import org.xerial.snappy.{SnappyOutputStream}
+  import org.xerial.snappy.SnappyInputStream
+  import org.xerial.snappy.SnappyOutputStream
   
   val snappyIn:SnappyInputStream = if (inputStream == null) null else new SnappyInputStream(inputStream)
   val snappyOut:SnappyOutputStream = if (outputStream == null) null else new  SnappyOutputStream(outputStream)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Tue Jan 24 20:54:11 2012
@@ -22,8 +22,6 @@ import java.nio._
 import java.nio.channels._
 import java.util.concurrent.atomic._
 
-import kafka._
-import kafka.message._
 import kafka.utils._
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala Tue Jan 24 20:54:11 2012
@@ -18,9 +18,6 @@
 package kafka.message
 
 import java.nio._
-import java.nio.channels._
-import java.util.zip.CRC32
-import java.util.UUID
 import kafka.utils._
 import kafka.common.UnknownMagicByteException
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala Tue Jan 24 20:54:11 2012
@@ -26,7 +26,7 @@ import kafka.utils._
  * 
  */
 @nonthreadsafe
-private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive {
+private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging {
   
   private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4)
   private var contentBuffer: ByteBuffer = null
@@ -81,7 +81,7 @@ private[kafka] class BoundedByteBufferRe
     }
     catch {
       case e: OutOfMemoryError => {
-        logger.error("OOME with size " + size, e)
+        error("OOME with size " + size, e)
         throw e
       }
       case e2 =>

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
 
 package kafka.network
 
-import java.util.ArrayList
 import java.util.concurrent._
 
 object RequestChannel { 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Tue Jan 24 20:54:11 2012
@@ -21,7 +21,6 @@ import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.net._
 import java.io._
-import java.nio._
 import java.nio.channels._
 
 import kafka.utils._

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala Tue Jan 24 20:54:11 2012
@@ -17,8 +17,6 @@
 
 package kafka.network
 
-import java.util.concurrent.atomic._
-import javax.management._
 import kafka.utils._
 import kafka.api.RequestKeys
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Tue Jan 24 20:54:11 2012
@@ -16,7 +16,7 @@
 */
 package kafka.producer
 
-import collection.mutable.Map
+import collection.immutable.Map
 import collection.SortedSet
 import kafka.cluster.{Broker, Partition}
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,7 @@
 package kafka.producer
 
 import collection.mutable.HashMap
-import collection.mutable.Map
+import collection.immutable.Map
 import collection.SortedSet
 import kafka.cluster.{Broker, Partition}
 import kafka.common.InvalidConfigException
@@ -90,7 +90,7 @@ private[producer] class ConfigBrokerPart
       brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),
         brokerIdHostPort(1), brokerIdHostPort(2).toInt))
     }
-    brokerInfo
+    brokerInfo.toMap
   }
 
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala Tue Jan 24 20:54:11 2012
@@ -18,14 +18,10 @@
 package kafka.producer
 
 import scala.collection.JavaConversions._
-import org.I0Itec.zkclient._
 import joptsimple._
-import java.util.Arrays.asList
 import java.util.Properties
-import java.util.Random
 import java.io._
 import kafka.message._
-import kafka.utils._
 import kafka.serializer._
 
 object ConsoleProducer { 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,7 @@ package kafka.producer
 
 import async.MissingConfigException
 import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.{Logger, AppenderSkeleton}
+import org.apache.log4j.AppenderSkeleton
 import kafka.utils.{Utils, Logging}
 import kafka.serializer.Encoder
 import java.util.{Properties, Date}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,6 @@ package kafka.producer
 
 import kafka.utils.Utils
 import java.util.Properties
-import kafka.message.{CompressionUtils, CompressionCodec}
 
 class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
   /** the broker to which the producer sends events */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala Tue Jan 24 20:54:11 2012
@@ -18,7 +18,7 @@ package kafka.producer
 
 import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
 import collection.mutable.HashMap
-import collection.mutable.Map
+import collection.immutable.Map
 import kafka.utils.Logging
 import collection.immutable.TreeSet
 import kafka.cluster.{Broker, Partition}
@@ -188,13 +188,9 @@ private[producer] class ZKBrokerPartitio
    * @return a mapping from brokerId to (host, port)
    */
   private def getZKBrokerInfo(): Map[Int, Broker] = {
-    val brokers = new HashMap[Int, Broker]()
     val allBrokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).map(bid => bid.toInt)
-    allBrokerIds.foreach { bid =>
-      val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
-      brokers += (bid -> Broker.createBroker(bid, brokerInfo))
-    }
-    brokers
+    val brokers = ZkUtils.getBrokerInfoFromIds(zkClient, allBrokerIds)
+    allBrokerIds.zip(brokers).toMap
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Tue Jan 24 20:54:11 2012
@@ -106,29 +106,26 @@ private[kafka] class AsyncProducer[T](co
 
     if(!added) {
       AsyncProducerStats.recordDroppedEvents
-      logger.error("Event queue is full of unsent messages, could not send event: " + event.toString)
+      error("Event queue is full of unsent messages, could not send event: " + event.toString)
       throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event.toString)
     }else {
-      if(logger.isTraceEnabled) {
-        logger.trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString)
-        logger.trace("Remaining queue size: " + queue.remainingCapacity)
-      }
+      trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString)
+      trace("Remaining queue size: " + queue.remainingCapacity)
     }
   }
 
   def close = {
     if(cbkHandler != null) {
       cbkHandler.close
-      logger.info("Closed the callback handler")
+      info("Closed the callback handler")
     }
     closed.set(true)
     queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1))
-    if(logger.isDebugEnabled)
-      logger.debug("Added shutdown command to the queue")
+    debug("Added shutdown command to the queue")
     sendThread.shutdown
     sendThread.awaitShutdown
     producer.close
-    logger.info("Closed AsyncProducer")
+   info("Closed AsyncProducer")
   }
 
   // for testing only

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Jan 24 20:54:11 2012
@@ -23,9 +23,11 @@ import kafka.network._
 import kafka.message._
 import kafka.api._
 import kafka.common.ErrorMapping
-import kafka.utils.SystemTime
-import kafka.utils.Logging
 import java.io.IOException
+import kafka.utils.{SystemTime, Logging}
+import collection.mutable.ListBuffer
+import kafka.admin.{CreateTopicCommand, AdminUtils}
+import java.lang.IllegalStateException
 
 /**
  * Logic to handle the various Kafka requests
@@ -42,6 +44,7 @@ class KafkaApis(val logManager: LogManag
         case RequestKeys.MultiFetch => handleMultiFetchRequest(receive)
         case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
         case RequestKeys.Offsets => handleOffsetRequest(receive)
+        case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
         case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
     }
   }
@@ -129,4 +132,38 @@ class KafkaApis(val logManager: LogManag
     val response = new OffsetArraySend(offsets)
     Some(response)
   }
+
+  def handleTopicMetadataRequest(request: Receive): Option[Send] = {
+    val metadataRequest = TopicMetadataRequest.readFrom(request.buffer)
+
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Topic metadata request " + metadataRequest.toString())
+
+    val topicsMetadata = new ListBuffer[TopicMetadata]()
+    val config = logManager.getServerConfig
+    val zkClient = logManager.getZookeeperClient
+    val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+
+    metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
+      val topic = topicAndMetadata._1
+      topicAndMetadata._2 match {
+        case Some(metadata) => topicsMetadata += metadata
+        case None =>
+          /* check if auto creation of topics is turned on */
+          if(config.autoCreateTopics) {
+            CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions,
+              config.defaultReplicationFactor)
+            info("Auto creation of topic %s with partitions %d and replication factor %d is successful!"
+              .format(topic, config.numPartitions, config.defaultReplicationFactor))
+            val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+            newTopicMetadata match {
+              case Some(topicMetadata) => topicsMetadata += topicMetadata
+              case None =>
+                throw new IllegalStateException("Topic metadata for automatically created topic %s does not exist".format(topic))
+            }
+          }
+      }
+    }
+    Some(new TopicMetadataSend(topicsMetadata))
+  }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Tue Jan 24 20:54:11 2012
@@ -79,9 +79,6 @@ class KafkaConfig(props: Properties) ext
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
   
-  /* enable zookeeper registration in the server */
-  val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)
-
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
   val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
 
@@ -93,4 +90,10 @@ class KafkaConfig(props: Properties) ext
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+
+  /* enable auto creation of topic on the server */
+  val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
+
+  /* default replication factors for automatically created topics */
+  val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Tue Jan 24 20:54:11 2012
@@ -17,9 +17,6 @@
 
 package kafka.server
 
-import java.util.concurrent._
-import java.util.concurrent.atomic._
-import org.apache.log4j.Logger
 import kafka.network._
 import kafka.utils._
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Jan 24 20:54:11 2012
@@ -17,13 +17,11 @@
 
 package kafka.server
 
-import scala.reflect.BeanProperty
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.io.File
-import org.apache.log4j.Logger
 import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
-import kafka.network.{SocketServerStats, SocketServer, RequestChannel}
+import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Tue Jan 24 20:54:11 2012
@@ -18,12 +18,11 @@
 package kafka.server
 
 import kafka.utils._
-import kafka.cluster.Broker
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.log.LogManager
 import java.net.InetAddress
+import kafka.common.KafkaZookeeperClient
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -41,7 +40,7 @@ class KafkaZooKeeper(config: KafkaConfig
   def startup() {
     /* start client */
     info("connecting to ZK: " + config.zkConnect)
-    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
     zkClient.subscribeStateChanges(new SessionExpireListener)
   }
 
@@ -49,17 +48,7 @@ class KafkaZooKeeper(config: KafkaConfig
     info("Registering broker " + brokerIdPath)
     val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
     val creatorId = hostName + "-" + System.currentTimeMillis
-    val broker = new Broker(config.brokerId, creatorId, hostName, config.port)
-    try {
-      ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
-    } catch {
-      case e: ZkNodeExistsException =>
-        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + 
-                                   "indicates that you either have configured a brokerid that is already in use, or " + 
-                                   "else you have shutdown this broker and restarted it faster than the zookeeper " + 
-                                   "timeout so it appears to be re-registering.")
-    }
-    info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }
 
   def registerTopicInZk(topic: String) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala Tue Jan 24 20:54:11 2012
@@ -17,10 +17,7 @@
 
 package kafka.server
 
-import java.nio._
-import java.nio.channels._
 import kafka.network._
-import kafka.message._
 import kafka.utils._
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala Tue Jan 24 20:54:11 2012
@@ -22,7 +22,7 @@ import java.util.Date
 import java.text.SimpleDateFormat
 import javax.management._
 import javax.management.remote._
-import joptsimple.{OptionSet, OptionParser}
+import joptsimple.OptionParser
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.math._

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala Tue Jan 24 20:54:11 2012
@@ -17,12 +17,9 @@
 
 package kafka.tools
 
-import java.net.URI
 import java.io._
 import joptsimple._
-import kafka.message._
 import kafka.producer._
-import java.util.Properties
 import kafka.utils.Utils
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Tue Jan 24 20:54:11 2012
@@ -17,19 +17,17 @@
 
 package kafka.tools
 
-import java.io.File
 import joptsimple.OptionParser
-import org.apache.log4j.Logger
 import java.util.concurrent.{Executors, CountDownLatch}
 import java.util.Properties
 import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.{DefaultEncoder, StringEncoder}
+import kafka.serializer.DefaultEncoder
 import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
 import kafka.consumer._
-import kafka.utils.{ZKStringSerializer, Utils, Logging}
+import kafka.utils.{ZKStringSerializer, Logging}
 import kafka.api.OffsetRequest
 import org.I0Itec.zkclient._
-import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
+import kafka.message.{CompressionCodec, Message}
 
 object ReplayLogProducer extends Logging {
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Tue Jan 24 20:54:11 2012
@@ -22,7 +22,6 @@ import joptsimple._
 import kafka.api.FetchRequest
 import kafka.utils._
 import kafka.consumer._
-import kafka.server._
 
 /**
  * Command line program to dump out messages to standard out using the simple consumer

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,6 @@ package kafka.utils
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import kafka.utils._
 
 /**
  * A scheduler for running jobs in the background

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala Tue Jan 24 20:54:11 2012
@@ -17,9 +17,8 @@
 
 package kafka.utils
 
-import scala.math._
 
-/** 
+/**
  * A generic range value with a start and end 
  */
 trait Range {