You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC

[22/30] git commit: KAFKA-608 and KAFKA-630 getTopicMetadata does not respect producer config settings and fix auto-create topic; patched by Neha Narkhede; reviewed by Jun Rao

KAFKA-608 and KAFKA-630 getTopicMetadata does not respect producer config settings and fix auto-create topic; patched by Neha Narkhede; reviewed by Jun Rao

git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1415909 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ca7b3ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ca7b3ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ca7b3ac

Branch: refs/heads/trunk
Commit: 6ca7b3acfdfe34752c42a0a1a26ea991b8e4c3b1
Parents: 6486fd1
Author: Neha Narkhede <ne...@apache.org>
Authored: Sat Dec 1 02:04:19 2012 +0000
Committer: Neha Narkhede <ne...@apache.org>
Committed: Sat Dec 1 02:04:19 2012 +0000

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala   |    5 +-
 core/src/main/scala/kafka/client/ClientUtils.scala |   29 +++++-
 core/src/main/scala/kafka/cluster/Broker.scala     |    4 +-
 .../scala/kafka/common/TopicExistsException.scala  |   22 ++++
 .../scala/kafka/consumer/ConsoleConsumer.scala     |    4 +
 .../kafka/consumer/ConsumerFetcherManager.scala    |    2 +-
 .../consumer/ZookeeperConsumerConnector.scala      |    2 +-
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    4 +-
 .../main/scala/kafka/producer/ProducerPool.scala   |   11 --
 .../scala/kafka/producer/SyncProducerConfig.scala  |    2 +-
 .../kafka/producer/async/DefaultEventHandler.scala |   39 ++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   10 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala    |    2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |    4 +-
 .../scala/kafka/perf/ProducerPerformance.scala     |   19 ++--
 .../replication_testsuite/replica_basic_test.py    |    6 +-
 .../testcase_9051/cluster_config.json              |   87 +++-----------
 .../testcase_9051/testcase_9051_properties.json    |   51 +--------
 system_test/testcase_to_run.json                   |    2 +-
 20 files changed, 130 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 40976c5..437a685 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -25,7 +25,8 @@ import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
 import scala.collection.mutable
-import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
+import kafka.common._
+import scala.Some
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -82,7 +83,7 @@ object AdminUtils extends Logging {
       ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
-      case e: ZkNodeExistsException => throw new AdministrationException("topic %s already exists".format(topic))
+      case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
       case e2 => throw new AdministrationException(e2.toString)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index e6e7200..cc4df5d 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -6,20 +6,28 @@ import kafka.api._
 import kafka.producer._
 import kafka.common.KafkaException
 import kafka.utils.{Utils, Logging}
+import java.util.Properties
 
 /**
  * Helper functions common to clients (producer, consumer, or admin)
  */
 object ClientUtils extends Logging{
 
-  def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = {
+  /**
+   * Used by the producer to send a metadata request since it has access to the ProducerConfig
+   * @param topics The topics for which the metadata needs to be fetched
+   * @param brokers The brokers in the cluster as configured on the producer through broker.list
+   * @param producerConfig The producer's config
+   * @return topic metadata response
+   */
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
     val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
     while(i < brokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i))
+      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
       info("Fetching metadata for topic %s".format(topics))
       try {
         topicMetadataResponse = producer.send(topicMetadataRequest)
@@ -39,7 +47,22 @@ object ClientUtils extends Logging{
     }
     return topicMetadataResponse
   }
-  
+
+  /**
+   * Used by a non-producer client to send a metadata request
+   * @param topics The topics for which the metadata needs to be fetched
+   * @param brokers The brokers in the cluster as configured on the client
+   * @param clientId The client's identifier
+   * @return topic metadata response
+   */
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
+    val props = new Properties()
+    props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
+    props.put("clientid", clientId)
+    val producerConfig = new ProducerConfig(props)
+    fetchTopicMetadata(topics, brokers, producerConfig)
+  }
+
   /**
    * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 03a75f0..6e072bf 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -47,7 +47,9 @@ private[kafka] case class Broker(val id: Int, val creatorId: String, val host: S
   
   override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
 
-  def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
+  def getZkString(): String = new String(creatorId + ":" + host + ":" + port)
+
+  def getConnectionString(): String = new String(host + ":" + port)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/common/TopicExistsException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicExistsException.scala b/core/src/main/scala/kafka/common/TopicExistsException.scala
new file mode 100644
index 0000000..88a084e
--- /dev/null
+++ b/core/src/main/scala/kafka/common/TopicExistsException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class TopicExistsException(message: String) extends RuntimeException(message) {
+  def this() = this(null) 
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 39cd57d..9378357 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -176,6 +176,7 @@ object ConsoleConsumer extends Logging {
       }
     })
 
+    var numMessages = 0L
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
     try {
@@ -188,6 +189,7 @@ object ConsoleConsumer extends Logging {
       for(messageAndTopic <- iter) {
         try {
           formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
+          numMessages += 1
         } catch {
           case e =>
             if (skipMessageOnError)
@@ -198,6 +200,7 @@ object ConsoleConsumer extends Logging {
         if(System.out.checkError()) {
           // This means no one is listening to our output stream any more, time to shutdown
           System.err.println("Unable to write to standard out, closing consumer.")
+          System.err.println("Consumed %d messages".format(numMessages))
           formatter.close()
           connector.shutdown()
           System.exit(1)
@@ -206,6 +209,7 @@ object ConsoleConsumer extends Logging {
     } catch {
       case e => error("Error processing message, stopping consumer: ", e)
     }
+    System.out.println("Consumed %d messages".format(numMessages))
     System.out.flush()
     formatter.close()
     connector.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index a80fac9..a6cbfb6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -54,7 +54,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         try {
           trace("Partitions without leader %s".format(noLeaderPartitionSet))
           val brokers = getAllBrokersInCluster(zkClient)
-          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata
+          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId).topicsMetadata
           val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
           topicsMetadata.foreach(
             tmd => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbb82a2..43e9fa6 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -402,7 +402,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata
+      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
       val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 938504a..d58a063 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 package kafka.producer
 
 import collection.mutable.HashMap
@@ -72,7 +72,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
    */
   def updateInfo(topics: Set[String]) {
     var topicsMetadata: Seq[TopicMetadata] = Nil
-    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers)
+    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig)
     topicsMetadata = topicMetadataResponse.topicsMetadata
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 7e78c7e..4970029 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -37,17 +37,6 @@ object ProducerPool {
     props.putAll(config.props.props)
     new SyncProducer(new SyncProducerConfig(props))
   }
-
-  /**
-   * Used in ClientUtils to send TopicMetadataRequest to a broker.
-   */
-  def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
-    val props = new Properties()
-    props.put("host", broker.host)
-    props.put("port", broker.port.toString)
-    props.put("client.id", clientId)
-    new SyncProducer(new SyncProducerConfig(props))
-  }
 }
 
 class ProducerPool(val config: ProducerConfig) extends Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 3e3dc49..f94415a 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -64,5 +64,5 @@ object SyncProducerConfig {
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0
-  val DefaultAckTimeoutMs = 500
+  val DefaultAckTimeoutMs = 1500
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 645402e..9f3e2ea 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -48,7 +48,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     lock synchronized {
       val serializedData = serialize(events)
       serializedData.foreach{
-        keyed => 
+        keyed =>
           val dataSize = keyed.message.payloadSize
           producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
           producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
@@ -82,7 +82,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         try {
           for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
             if (logger.isTraceEnabled)
-              messagesPerBrokerMap.foreach(partitionAndEvent => 
+              messagesPerBrokerMap.foreach(partitionAndEvent =>
                 trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
             val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
 
@@ -173,7 +173,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     debug("Broker partitions registered for topic: %s are %s"
       .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
-    if(totalNumPartitions == 0) 
+    if(totalNumPartitions == 0)
       throw new NoBrokersForPartitionException("Partition key = " + m.key)
     topicPartitionsList
   }
@@ -189,10 +189,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     if(numPartitions <= 0)
       throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
         "\n Valid values are > 0")
-    val partition = 
-      if(key == null) 
+    val partition =
+      if(key == null)
         Utils.abs(counter.getAndIncrement()) % numPartitions
-      else 
+      else
         partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
@@ -217,11 +217,16 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)
-        trace("Producer sent messages for topics %s to broker %d on %s:%d"
+        debug("Producer sent messages for topics %s to broker %d on %s:%d"
           .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
         if (response.status.size != producerRequest.data.size)
           throw new KafkaException("Incomplete response (%s) for producer request (%s)"
-                                           .format(response, producerRequest))
+            .format(response, producerRequest))
+        if (logger.isTraceEnabled) {
+          val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
+          successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
+            trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
+        }
         response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
           .map(partitionStatus => partitionStatus._1)
       } catch {
@@ -236,33 +241,33 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
 
   private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
     /** enforce the compressed.topics config here.
-     *  If the compression codec is anything other than NoCompressionCodec,
-     *    Enable compression only for specified topics if any
-     *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
-     *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
-     */
+      *  If the compression codec is anything other than NoCompressionCodec,
+      *    Enable compression only for specified topics if any
+      *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
+      *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
+      */
 
     val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
       val rawMessages = messages.map(_.message)
       ( topicAndPartition,
         config.compressionCodec match {
           case NoCompressionCodec =>
-            trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
+            debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
             new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
           case _ =>
             config.compressedTopics.size match {
               case 0 =>
-                trace("Sending %d messages with compression codec %d to %s"
+                debug("Sending %d messages with compression codec %d to %s"
                   .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                 new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
               case _ =>
                 if(config.compressedTopics.contains(topicAndPartition.topic)) {
-                  trace("Sending %d messages with compression codec %d to %s"
+                  debug("Sending %d messages with compression codec %d to %s"
                     .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                   new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
                 }
                 else {
-                  trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+                  debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
                     .format(messages.size, topicAndPartition, config.compressedTopics.toString))
                   new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cc04ed5..b3dc79d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -434,9 +434,13 @@ class KafkaApis(val requestChannel: RequestChannel,
             try {
               /* check if auto creation of topics is turned on */
               if (config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
-                info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                             .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+                try {
+                  CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+                  info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                               .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+                } catch {
+                  case e: TopicExistsException => // let it go, possibly another broker created this topic
+                }
                 val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic, zkClient)
                 topicsMetadata += newTopicMetadata
                 newTopicMetadata.errorCode match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index addd8db..1842c03 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -125,7 +125,7 @@ object SimpleConsumerShell extends Logging {
     // getting topic metadata
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4333401..8e40f2b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -184,7 +184,7 @@ object ZkUtils extends Logging {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
     try {
-      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString)
     } catch {
       case e: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 17cabc7..e1f68dd 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ZkUtils, TestUtils}
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
 
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -170,7 +170,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
       AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
       fail("shouldn't be able to create a topic already exists")
     } catch {
-      case e: AdministrationException => // this is good
+      case e: TopicExistsException => // this is good
       case e2 => throw e2
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index a270289..a9a5f07 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -126,11 +126,6 @@ object ProducerPerformance extends Logging {
       .withRequiredArg()
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(-1)
-    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
             "set, the csv metrics will be outputed here")
@@ -223,6 +218,7 @@ object ProducerPerformance extends Logging {
     private val threadIdLabel  = "ThreadID"
     private val topicLabel     = "Topic"
     private var leftPaddedSeqId : String = ""
+
     private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
       // Each thread gets a unique range of sequential no. for its ids.
       // Eg. 1000 msg in 10 threads => 100 msg per thread
@@ -246,12 +242,13 @@ object ProducerPerformance extends Logging {
 
     private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
       val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
-      val message = if(config.seqIdMode) {
-        val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
-        generateMessageWithSeqId(topic, seqId, msgSize)
-      } else {
-        new Array[Byte](msgSize)
-      }
+      val message =
+        if(config.seqIdMode) {
+          val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
+          generateMessageWithSeqId(topic, seqId, msgSize)
+        } else {
+          new Array[Byte](msgSize)
+        }
       (new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index 6f2e15a..8a6521e 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -392,7 +392,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                 if logRetentionTest.lower() == "false":
                     self.log_message("starting consumer in the background")
                     kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
-                    time.sleep(1)
+                    time.sleep(10)
                     
                 # =============================================
                 # this testcase is completed - stop all entities
@@ -421,11 +421,11 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                     kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
                 elif consumerMultiTopicsMode.lower() == "true":
-                    kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+                    #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv)
                 else:
                     kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
-                    kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+                    #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
                     kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
 
                 # =============================================

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/system_test/replication_testsuite/testcase_9051/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_9051/cluster_config.json b/system_test/replication_testsuite/testcase_9051/cluster_config.json
index 7103040..7bef92b 100644
--- a/system_test/replication_testsuite/testcase_9051/cluster_config.json
+++ b/system_test/replication_testsuite/testcase_9051/cluster_config.json
@@ -2,105 +2,56 @@
     "cluster_config": [
         {
             "entity_id": "0",
-            "hostname": "localhost",
+            "hostname": "esv4-app18.corp",
             "role": "zookeeper",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9900"
         },
-
-
         {
             "entity_id": "1",
-            "hostname": "localhost",
+            "hostname": "esv4-app18.corp",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9901"
         },
         {
             "entity_id": "2",
-            "hostname": "localhost",
+            "hostname": "esv4-app19.corp",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9902"
         },
         {
             "entity_id": "3",
-            "hostname": "localhost",
+            "hostname": "esv4-app20.corp",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9903"
         },
         {
             "entity_id": "4",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9904"
-        },
-        {
-            "entity_id": "5",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9905"
-        },
-        {
-            "entity_id": "6",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9906"
-        },
-        {
-            "entity_id": "7",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9907"
-        },
-        {
-            "entity_id": "8",
-            "hostname": "localhost",
-            "role": "broker",
-            "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
-            "jmx_port": "9908"
-        },
-
-
-        {
-            "entity_id": "9",
-            "hostname": "localhost",
+            "hostname": "esv4-app18.corp",
             "role": "producer_performance",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9909"
         },
         {
-            "entity_id": "10",
-            "hostname": "localhost",
+            "entity_id": "5",
+            "hostname": "esv4-app18.corp",
             "role": "console_consumer",
             "cluster_name": "source",
-            "kafka_home": "default",
-            "java_home": "default",
+            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
             "jmx_port": "9910"
         }
     ]

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json b/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
index 84e5789..9a99c39 100644
--- a/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
+++ b/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
@@ -13,7 +13,7 @@
   "testcase_args": {
     "broker_type": "leader",
     "bounce_broker": "false",
-    "replica_factor": "8",
+    "replica_factor": "3",
     "num_partition": "2",
     "num_iteration": "1",
     "producer_multi_topics_mode": "true",
@@ -59,51 +59,6 @@
     },
     {
       "entity_id": "4",
-      "port": "9094",
-      "brokerid": "4",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_4_logs",
-      "log_filename": "kafka_server_9094.log",
-      "config_filename": "kafka_server_9094.properties"
-    },
-    {
-      "entity_id": "5",
-      "port": "9095",
-      "brokerid": "5",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_5_logs",
-      "log_filename": "kafka_server_9095.log",
-      "config_filename": "kafka_server_9095.properties"
-    },
-    {
-      "entity_id": "6",
-      "port": "9096",
-      "brokerid": "6",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_6_logs",
-      "log_filename": "kafka_server_9096.log",
-      "config_filename": "kafka_server_9096.properties"
-    },
-    {
-      "entity_id": "7",
-      "port": "9097",
-      "brokerid": "7",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_7_logs",
-      "log_filename": "kafka_server_9097.log",
-      "config_filename": "kafka_server_9097.properties"
-    },
-    {
-      "entity_id": "8",
-      "port": "9098",
-      "brokerid": "8",
-      "log.file.size": "1048576",
-      "log.dir": "/tmp/kafka_server_8_logs",
-      "log_filename": "kafka_server_9098.log",
-      "config_filename": "kafka_server_9098.properties"
-    },
-    {
-      "entity_id": "9",
       "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
       "threads": "5",
       "compression-codec": "0",
@@ -111,13 +66,13 @@
       "message": "500",
       "request-num-acks": "-1",
       "producer-retry-backoff-ms": "3500",
-      "producer-num-retries": "5",
+      "producer-num-retries": "3",
       "async":"false",
       "log_filename": "producer_performance_9.log",
       "config_filename": "producer_performance_9.properties"
     },
     {
-      "entity_id": "10",
+      "entity_id": "5",
       "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
       "groupid": "mytestgroup",
       "consumer-timeout-ms": "60000",

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca7b3ac/system_test/testcase_to_run.json
----------------------------------------------------------------------
diff --git a/system_test/testcase_to_run.json b/system_test/testcase_to_run.json
index c6cf17e..9dfa0ec 100644
--- a/system_test/testcase_to_run.json
+++ b/system_test/testcase_to_run.json
@@ -1,5 +1,5 @@
 {
     "ReplicaBasicTest"   : [
-        "testcase_1"
+        "testcase_9051"
     ]
 }