You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/12/01 03:04:29 UTC

svn commit: r1415909 - in /kafka/branches/0.8: core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/client/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/produce...

Author: nehanarkhede
Date: Sat Dec  1 02:04:19 2012
New Revision: 1415909

URL: http://svn.apache.org/viewvc?rev=1415909&view=rev
Log:
KAFKA-608 and KAFKA-630 getTopicMetadata does not respect producer config settings and fix auto-create topic; patched by Neha Narkhede; reviewed by Jun Rao

Added:
    kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala
Modified:
    kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala
    kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
    kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
    kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json
    kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
    kafka/branches/0.8/system_test/testcase_to_run.json

Modified: kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Sat Dec  1 02:04:19 2012
@@ -25,7 +25,8 @@ import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
 import scala.collection.mutable
-import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
+import kafka.common._
+import scala.Some
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -82,7 +83,7 @@ object AdminUtils extends Logging {
       ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
-      case e: ZkNodeExistsException => throw new AdministrationException("topic %s already exists".format(topic))
+      case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
       case e2 => throw new AdministrationException(e2.toString)
     }
   }

Modified: kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala Sat Dec  1 02:04:19 2012
@@ -6,20 +6,28 @@ import kafka.api._
 import kafka.producer._
 import kafka.common.KafkaException
 import kafka.utils.{Utils, Logging}
+import java.util.Properties
 
 /**
  * Helper functions common to clients (producer, consumer, or admin)
  */
 object ClientUtils extends Logging{
 
-  def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = {
+  /**
+   * Used by the producer to send a metadata request since it has access to the ProducerConfig
+   * @param topics The topics for which the metadata needs to be fetched
+   * @param brokers The brokers in the cluster as configured on the producer through broker.list
+   * @param producerConfig The producer's config
+   * @return topic metadata response
+   */
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
     val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
     while(i < brokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i))
+      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
       info("Fetching metadata for topic %s".format(topics))
       try {
         topicMetadataResponse = producer.send(topicMetadataRequest)
@@ -39,7 +47,22 @@ object ClientUtils extends Logging{
     }
     return topicMetadataResponse
   }
-  
+
+  /**
+   * Used by a non-producer client to send a metadata request
+   * @param topics The topics for which the metadata needs to be fetched
+   * @param brokers The brokers in the cluster as configured on the client
+   * @param clientId The client's identifier
+   * @return topic metadata response
+   */
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
+    val props = new Properties()
+    props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
+    props.put("clientid", clientId)
+    val producerConfig = new ProducerConfig(props)
+    fetchTopicMetadata(topics, brokers, producerConfig)
+  }
+
   /**
    * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
    */

Modified: kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Sat Dec  1 02:04:19 2012
@@ -47,7 +47,9 @@ private[kafka] case class Broker(val id:
   
   override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
 
-  def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
+  def getZkString(): String = new String(creatorId + ":" + host + ":" + port)
+
+  def getConnectionString(): String = new String(host + ":" + port)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)

Added: kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala?rev=1415909&view=auto
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala (added)
+++ kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala Sat Dec  1 02:04:19 2012
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class TopicExistsException(message: String) extends RuntimeException(message) {
+  def this() = this(null) 
+}
\ No newline at end of file

Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sat Dec  1 02:04:19 2012
@@ -176,6 +176,7 @@ object ConsoleConsumer extends Logging {
       }
     })
 
+    var numMessages = 0L
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
     try {
@@ -188,6 +189,7 @@ object ConsoleConsumer extends Logging {
       for(messageAndTopic <- iter) {
         try {
           formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
+          numMessages += 1
         } catch {
           case e =>
             if (skipMessageOnError)
@@ -198,6 +200,7 @@ object ConsoleConsumer extends Logging {
         if(System.out.checkError()) {
           // This means no one is listening to our output stream any more, time to shutdown
           System.err.println("Unable to write to standard out, closing consumer.")
+          System.err.println("Consumed %d messages".format(numMessages))
           formatter.close()
           connector.shutdown()
           System.exit(1)
@@ -206,6 +209,7 @@ object ConsoleConsumer extends Logging {
     } catch {
       case e => error("Error processing message, stopping consumer: ", e)
     }
+    System.out.println("Consumed %d messages".format(numMessages))
     System.out.flush()
     formatter.close()
     connector.shutdown()

Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Sat Dec  1 02:04:19 2012
@@ -54,7 +54,7 @@ class ConsumerFetcherManager(private val
         try {
           trace("Partitions without leader %s".format(noLeaderPartitionSet))
           val brokers = getAllBrokersInCluster(zkClient)
-          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata
+          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId).topicsMetadata
           val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
           topicsMetadata.foreach(
             tmd => {

Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Sat Dec  1 02:04:19 2012
@@ -402,7 +402,7 @@ private[kafka] class ZookeeperConsumerCo
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata
+      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
       val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{

Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Sat Dec  1 02:04:19 2012
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 package kafka.producer
 
 import collection.mutable.HashMap
@@ -72,7 +72,7 @@ class BrokerPartitionInfo(producerConfig
    */
   def updateInfo(topics: Set[String]) {
     var topicsMetadata: Seq[TopicMetadata] = Nil
-    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers)
+    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig)
     topicsMetadata = topicMetadataResponse.topicsMetadata
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{

Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Sat Dec  1 02:04:19 2012
@@ -37,17 +37,6 @@ object ProducerPool {
     props.putAll(config.props.props)
     new SyncProducer(new SyncProducerConfig(props))
   }
-
-  /**
-   * Used in ClientUtils to send TopicMetadataRequest to a broker.
-   */
-  def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
-    val props = new Properties()
-    props.put("host", broker.host)
-    props.put("port", broker.port.toString)
-    props.put("client.id", clientId)
-    new SyncProducer(new SyncProducerConfig(props))
-  }
 }
 
 class ProducerPool(val config: ProducerConfig) extends Logging {

Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Sat Dec  1 02:04:19 2012
@@ -64,5 +64,5 @@ object SyncProducerConfig {
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0
-  val DefaultAckTimeoutMs = 500
+  val DefaultAckTimeoutMs = 1500
 }
\ No newline at end of file

Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Sat Dec  1 02:04:19 2012
@@ -48,7 +48,7 @@ class DefaultEventHandler[K,V](config: P
     lock synchronized {
       val serializedData = serialize(events)
       serializedData.foreach{
-        keyed => 
+        keyed =>
           val dataSize = keyed.message.payloadSize
           producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
           producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
@@ -82,7 +82,7 @@ class DefaultEventHandler[K,V](config: P
         try {
           for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
             if (logger.isTraceEnabled)
-              messagesPerBrokerMap.foreach(partitionAndEvent => 
+              messagesPerBrokerMap.foreach(partitionAndEvent =>
                 trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
             val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
 
@@ -173,7 +173,7 @@ class DefaultEventHandler[K,V](config: P
     debug("Broker partitions registered for topic: %s are %s"
       .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
-    if(totalNumPartitions == 0) 
+    if(totalNumPartitions == 0)
       throw new NoBrokersForPartitionException("Partition key = " + m.key)
     topicPartitionsList
   }
@@ -189,10 +189,10 @@ class DefaultEventHandler[K,V](config: P
     if(numPartitions <= 0)
       throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
         "\n Valid values are > 0")
-    val partition = 
-      if(key == null) 
+    val partition =
+      if(key == null)
         Utils.abs(counter.getAndIncrement()) % numPartitions
-      else 
+      else
         partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
@@ -217,11 +217,16 @@ class DefaultEventHandler[K,V](config: P
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)
-        trace("Producer sent messages for topics %s to broker %d on %s:%d"
+        debug("Producer sent messages for topics %s to broker %d on %s:%d"
           .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
         if (response.status.size != producerRequest.data.size)
           throw new KafkaException("Incomplete response (%s) for producer request (%s)"
-                                           .format(response, producerRequest))
+            .format(response, producerRequest))
+        if (logger.isTraceEnabled) {
+          val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
+          successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
+            trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
+        }
         response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
           .map(partitionStatus => partitionStatus._1)
       } catch {
@@ -236,33 +241,33 @@ class DefaultEventHandler[K,V](config: P
 
   private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
     /** enforce the compressed.topics config here.
-     *  If the compression codec is anything other than NoCompressionCodec,
-     *    Enable compression only for specified topics if any
-     *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
-     *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
-     */
+      *  If the compression codec is anything other than NoCompressionCodec,
+      *    Enable compression only for specified topics if any
+      *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
+      *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
+      */
 
     val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
       val rawMessages = messages.map(_.message)
       ( topicAndPartition,
         config.compressionCodec match {
           case NoCompressionCodec =>
-            trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
+            debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
             new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
           case _ =>
             config.compressedTopics.size match {
               case 0 =>
-                trace("Sending %d messages with compression codec %d to %s"
+                debug("Sending %d messages with compression codec %d to %s"
                   .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                 new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
               case _ =>
                 if(config.compressedTopics.contains(topicAndPartition.topic)) {
-                  trace("Sending %d messages with compression codec %d to %s"
+                  debug("Sending %d messages with compression codec %d to %s"
                     .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                   new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
                 }
                 else {
-                  trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+                  debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
                     .format(messages.size, topicAndPartition, config.compressedTopics.toString))
                   new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
                 }

Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sat Dec  1 02:04:19 2012
@@ -434,9 +434,13 @@ class KafkaApis(val requestChannel: Requ
             try {
               /* check if auto creation of topics is turned on */
               if (config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
-                info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                             .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+                try {
+                  CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+                  info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                               .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+                } catch {
+                  case e: TopicExistsException => // let it go, possibly another broker created this topic
+                }
                 val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic, zkClient)
                 topicsMetadata += newTopicMetadata
                 newTopicMetadata.errorCode match {

Modified: kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Sat Dec  1 02:04:19 2012
@@ -125,7 +125,7 @@ object SimpleConsumerShell extends Loggi
     // getting topic metadata
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)

Modified: kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Sat Dec  1 02:04:19 2012
@@ -184,7 +184,7 @@ object ZkUtils extends Logging {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
     try {
-      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString)
     } catch {
       case e: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")

Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Sat Dec  1 02:04:19 2012
@@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ZkUtils, TestUtils}
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
 
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -170,7 +170,7 @@ class AdminTest extends JUnit3Suite with
       AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
       fail("shouldn't be able to create a topic already exists")
     } catch {
-      case e: AdministrationException => // this is good
+      case e: TopicExistsException => // this is good
       case e2 => throw e2
     }
   }

Modified: kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Sat Dec  1 02:04:19 2012
@@ -126,11 +126,6 @@ object ProducerPerformance extends Loggi
       .withRequiredArg()
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(-1)
-    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
             "set, the csv metrics will be outputed here")
@@ -223,6 +218,7 @@ object ProducerPerformance extends Loggi
     private val threadIdLabel  = "ThreadID"
     private val topicLabel     = "Topic"
     private var leftPaddedSeqId : String = ""
+
     private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
       // Each thread gets a unique range of sequential no. for its ids.
       // Eg. 1000 msg in 10 threads => 100 msg per thread
@@ -246,12 +242,13 @@ object ProducerPerformance extends Loggi
 
     private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
       val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
-      val message = if(config.seqIdMode) {
-        val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
-        generateMessageWithSeqId(topic, seqId, msgSize)
-      } else {
-        new Array[Byte](msgSize)
-      }
+      val message =
+        if(config.seqIdMode) {
+          val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
+          generateMessageWithSeqId(topic, seqId, msgSize)
+        } else {
+          new Array[Byte](msgSize)
+        }
       (new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
     }
 

Modified: kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Sat Dec  1 02:04:19 2012
@@ -392,7 +392,7 @@ class ReplicaBasicTest(ReplicationUtils,
                 if logRetentionTest.lower() == "false":
                     self.log_message("starting consumer in the background")
                     kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
-                    time.sleep(1)
+                    time.sleep(10)
                     
                 # =============================================
                 # this testcase is completed - stop all entities
@@ -421,11 +421,11 @@ class ReplicaBasicTest(ReplicationUtils,
                     kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
                 elif consumerMultiTopicsMode.lower() == "true":
-                    kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+                    #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv)
                 else:
                     kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
-                    kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+                    #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
 
                 # =============================================

Modified: kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json (original)
+++ kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json Sat Dec  1 02:04:19 2012
@@ -2,105 +2,56 @@
     "cluster_config": [
         {
             "entity_id": "0",
-            "hostname": "localhost",
+            "hostname": "esv4-app18.corp",
             "role": "zookeeper",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9900"
         },
-
-
         {
             "entity_id": "1",
-            "hostname": "localhost",
+            "hostname": "esv4-app18.corp",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9901"
         },
         {
             "entity_id": "2",
-            "hostname": "localhost",
+            "hostname": "esv4-app19.corp",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9902"
         },
         {
             "entity_id": "3",
-            "hostname": "localhost",
+            "hostname": "esv4-app20.corp",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9903"
         },
         {
             "entity_id": "4",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9904"
-        },
-        {
-            "entity_id": "5",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9905"
-        },
-        {
-            "entity_id": "6",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9906"
-        },
-        {
-            "entity_id": "7",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9907"
-        },
-        {
-            "entity_id": "8",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9908"
-        },
-
-
-        {
-            "entity_id": "9",
-            "hostname": "localhost",
+            "hostname": "esv4-app18.corp",
             "role": "producer_performance",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9909"
         },
         {
-            "entity_id": "10",
-            "hostname": "localhost",
+            "entity_id": "5",
+            "hostname": "esv4-app18.corp",
             "role": "console_consumer",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9910"
         }
     ]

Modified: kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json (original)
+++ kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json Sat Dec  1 02:04:19 2012
@@ -13,7 +13,7 @@
   "testcase_args": {
     "broker_type": "leader",
     "bounce_broker": "false",
-    "replica_factor": "8",
+    "replica_factor": "3",
     "num_partition": "2",
     "num_iteration": "1",
     "producer_multi_topics_mode": "true",
@@ -59,51 +59,6 @@
     },
     {
       "entity_id": "4",
-      "port": "9094",
-      "brokerid": "4",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_4_logs",
-      "log_filename": "kafka_server_9094.log",
-      "config_filename": "kafka_server_9094.properties"
-    },
-    {
-      "entity_id": "5",
-      "port": "9095",
-      "brokerid": "5",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_5_logs",
-      "log_filename": "kafka_server_9095.log",
-      "config_filename": "kafka_server_9095.properties"
-    },
-    {
-      "entity_id": "6",
-      "port": "9096",
-      "brokerid": "6",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_6_logs",
-      "log_filename": "kafka_server_9096.log",
-      "config_filename": "kafka_server_9096.properties"
-    },
-    {
-      "entity_id": "7",
-      "port": "9097",
-      "brokerid": "7",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_7_logs",
-      "log_filename": "kafka_server_9097.log",
-      "config_filename": "kafka_server_9097.properties"
-    },
-    {
-      "entity_id": "8",
-      "port": "9098",
-      "brokerid": "8",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_8_logs",
-      "log_filename": "kafka_server_9098.log",
-      "config_filename": "kafka_server_9098.properties"
-    },
-    {
-      "entity_id": "9",
       "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
       "threads": "5",
       "compression-codec": "0",
@@ -111,13 +66,13 @@
       "message": "500",
       "request-num-acks": "-1",
       "producer-retry-backoff-ms": "3500",
-      "producer-num-retries": "5",
+      "producer-num-retries": "3",
       "async":"false",
       "log_filename": "producer_performance_9.log",
       "config_filename": "producer_performance_9.properties"
     },
     {
-      "entity_id": "10",
+      "entity_id": "5",
       "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
       "groupid": "mytestgroup",
       "consumer-timeout-ms": "60000",

Modified: kafka/branches/0.8/system_test/testcase_to_run.json
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/testcase_to_run.json?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/testcase_to_run.json (original)
+++ kafka/branches/0.8/system_test/testcase_to_run.json Sat Dec  1 02:04:19 2012
@@ -1,5 +1,5 @@
 {
     "ReplicaBasicTest"   : [
-        "testcase_1"
+        "testcase_9051"
     ]
 }