You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/05 00:43:00 UTC

svn commit: r1178997 - in /incubator/kafka/trunk/core/src: main/scala/kafka/producer/ test/scala/unit/kafka/javaapi/producer/ test/scala/unit/kafka/producer/

Author: nehanarkhede
Date: Tue Oct  4 22:43:00 2011
New Revision: 1178997

URL: http://svn.apache.org/viewvc?rev=1178997&view=rev
Log:
KAFKA-129 ZK-based producer can throw an unexpceted exception when sending a message; patched by nehanarkhede; reviewed by junrao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1178997&r1=1178996&r2=1178997&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Tue Oct  4 22:43:00 2011
@@ -44,6 +44,15 @@ trait BrokerPartitionInfo {
   def getAllBrokerInfo: Map[Int, Broker]
 
   /**
+   * This is relevant to the ZKBrokerPartitionInfo. It updates the ZK cache
+   * by reading from zookeeper and recreating the data structures. This API
+   * is invoked by the producer, when it detects that the ZK cache of
+   * ZKBrokerPartitionInfo is stale.
+   *
+   */
+  def updateInfo
+
+  /**
    * Cleanup
    */
   def close

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala?rev=1178997&r1=1178996&r2=1178997&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala Tue Oct  4 22:43:00 2011
@@ -53,13 +53,15 @@ private[producer] class ConfigBrokerPart
 
   def close {}
 
+  def updateInfo = {}
+
   /**
    * Generate a sequence of (brokerId, numPartitions) for all brokers
    * specified in the producer configuration
    * @return sequence of (brokerId, numPartitions)
    */
   private def getConfigTopicPartitionInfo(): SortedSet[Partition] = {
-    val brokerInfoList = config.brokerPartitionInfo.split(",")
+    val brokerInfoList = config.brokerList.split(",")
     if(brokerInfoList.size == 0) throw new InvalidConfigException("broker.list is empty")
     // check if each individual broker info is valid => (brokerId: brokerHost: brokerPort)
     brokerInfoList.foreach { bInfo =>
@@ -84,7 +86,7 @@ private[producer] class ConfigBrokerPart
    */
   private def getConfigBrokerInfo(): Map[Int, Broker] = {
     val brokerInfo = new HashMap[Int, Broker]()
-    val brokerInfoList = config.brokerPartitionInfo.split(",")
+    val brokerInfoList = config.brokerList.split(",")
     brokerInfoList.foreach{ bInfo =>
       val brokerIdHostPort = bInfo.split(":")
       brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1178997&r1=1178996&r2=1178997&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Tue Oct  4 22:43:00 2011
@@ -23,7 +23,6 @@ import kafka.utils._
 import java.util.Properties
 import kafka.cluster.{Partition, Broker}
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.api.ProducerRequest
 import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
 
 class Producer[K,V](config: ProducerConfig,
@@ -35,9 +34,9 @@ class Producer[K,V](config: ProducerConf
 {
   private val logger = Logger.getLogger(classOf[Producer[K, V]])
   private val hasShutdown = new AtomicBoolean(false)
-  if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
+  if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList))
     throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
-  if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo))
+  if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList))
     logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
   private val random = new java.util.Random
   // check if zookeeper based auto partition discovery is enabled
@@ -94,47 +93,86 @@ class Producer[K,V](config: ProducerConf
            partitioner: Partitioner[K]) =
     this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
          new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
+
   /**
    * Sends the data, partitioned by key to the topic using either the
    * synchronous or the asynchronous producer
    * @param producerData the producer data object that encapsulates the topic, key and message data
    */
   def send(producerData: ProducerData[K,V]*) {
+    zkEnabled match {
+      case true => zkSend(producerData: _*)
+      case false => configSend(producerData: _*)
+    }
+  }
+
+  private def zkSend(producerData: ProducerData[K,V]*) {
     val producerPoolRequests = producerData.map { pd =>
-    // find the number of broker partitions registered for this topic
-      logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
-      val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
-      logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
-      val totalNumPartitions = numBrokerPartitions.length
-      if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
+      var brokerIdPartition: Option[Partition] = None
+      var brokerInfoOpt: Option[Broker] = None
+
+      var numRetries: Int = 0
+      while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) {
+        if(numRetries > 0) {
+          logger.info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again")
+          brokerPartitionInfo.updateInfo
+        }
+
+        val numBrokerPartitions = getNumPartitionsForTopic(pd)
+        val totalNumPartitions = numBrokerPartitions.length
 
-      var brokerIdPartition: Partition = null
-      var partition: Int = 0
-      if(zkEnabled) {
-        // get the partition id
         val partitionId = getPartition(pd.getKey, totalNumPartitions)
-        brokerIdPartition = numBrokerPartitions(partitionId)
-        val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
-        logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
-                " on partition " + brokerIdPartition.partId)
-        partition = brokerIdPartition.partId
-      }else {
-        // randomly select a broker
-        val randomBrokerId = random.nextInt(totalNumPartitions)
-        brokerIdPartition = numBrokerPartitions(randomBrokerId)
-        val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
-
-        logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
-                " on a randomly chosen partition")
-        partition = ProducerRequest.RandomPartition
+        brokerIdPartition = Some(numBrokerPartitions(partitionId))
+        brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId)
+        numRetries += 1
+      }
+
+      brokerInfoOpt match {
+        case Some(brokerInfo) =>
+          if(logger.isDebugEnabled) logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+                  " on partition " + brokerIdPartition.get.partId)
+        case None =>
+          throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " +
+            pd.getTopic + " and key: " + pd.getKey)
       }
       producerPool.getProducerPoolData(pd.getTopic,
-                                       new Partition(brokerIdPartition.brokerId, partition),
-                                       pd.getData)
+        new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId),
+        pd.getData)
     }
     producerPool.send(producerPoolRequests: _*)
   }
 
+  private def configSend(producerData: ProducerData[K,V]*) {
+    val producerPoolRequests = producerData.map { pd =>
+    // find the broker partitions registered for this topic
+      val numBrokerPartitions = getNumPartitionsForTopic(pd)
+      val totalNumPartitions = numBrokerPartitions.length
+
+      val partitionId = getPartition(pd.getKey, totalNumPartitions)
+      val brokerIdPartition = numBrokerPartitions(partitionId)
+      val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
+
+      if(logger.isDebugEnabled)
+        logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " +
+          brokerIdPartition.partId)
+      producerPool.getProducerPoolData(pd.getTopic,
+        new Partition(brokerIdPartition.brokerId, brokerIdPartition.partId),
+        pd.getData)
+    }
+    producerPool.send(producerPoolRequests: _*)
+  }
+
+  private def getNumPartitionsForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
+    if(logger.isDebugEnabled)
+      logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
+    val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
+    if(logger.isDebugEnabled)
+      logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
+    val totalNumPartitions = numBrokerPartitions.length
+    if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
+    numBrokerPartitions
+  }
+
   /**
    * Retrieves the partition id and throws an InvalidPartitionException if
    * the value of partition is not between 0 and numPartitions-1

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1178997&r1=1178996&r2=1178997&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Tue Oct  4 22:43:00 2011
@@ -28,8 +28,8 @@ class ProducerConfig(val props: Properti
   /** For bypassing zookeeper based auto partition discovery, use this config   *
    *  to pass in static broker and per-broker partition information. Format-    *
    *  brokerid1:host1:port1, brokerid2:host2:port2*/
-  val brokerPartitionInfo = Utils.getString(props, "broker.list", null)
-  if(brokerPartitionInfo != null && Utils.getString(props, "partitioner.class", null) != null)
+  val brokerList = Utils.getString(props, "broker.list", null)
+  if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null)
     throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
 
   /** the partitioner class for partitioning events amongst sub-topics */
@@ -58,4 +58,14 @@ class ProducerConfig(val props: Properti
    *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
    */
   val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
+
+  /**
+   * The producer using the zookeeper software load balancer maintains a ZK cache that gets
+   * updated by the zookeeper watcher listeners. During some events like a broker bounce, the
+   * producer ZK cache can get into an inconsistent state, for a small time period. In this time
+   * period, it could end up picking a broker partition that is unavailable. When this happens, the
+   * ZK cache needs to be updated.
+   * This parameter specifies the number of times the producer attempts to refresh this ZK cache.
+   */
+  val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3)
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala?rev=1178997&r1=1178996&r2=1178997&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala Tue Oct  4 22:43:00 2011
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package kafka.producer
 
 import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
@@ -27,6 +27,7 @@ import org.I0Itec.zkclient.{IZkStateList
 import collection.SortedSet
 
 private[producer] object ZKBrokerPartitionInfo {
+
   /**
    * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers
    * specified
@@ -49,7 +50,7 @@ private[producer] object ZKBrokerPartiti
       }
     }
     brokerParts
-  }  
+  }
 }
 
 /**
@@ -89,22 +90,24 @@ private[producer] class ZKBrokerPartitio
    * @return a sequence of (brokerId, numPartitions). Returns a zero-length
    * sequence if no brokers are available.
    */
-  def getBrokerPartitionInfo(topic: String): scala.collection.immutable.SortedSet[Partition] = {
-    val brokerPartitions = topicBrokerPartitions.get(topic)
-    var numBrokerPartitions = SortedSet.empty[Partition]
-    brokerPartitions match {
-      case Some(bp) =>
-        bp.size match {
-          case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
-            numBrokerPartitions = bootstrapWithExistingBrokers(topic)
-            topicBrokerPartitions += (topic -> numBrokerPartitions)
-          case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp
-        }
-      case None =>  // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
-        numBrokerPartitions = bootstrapWithExistingBrokers(topic)
-        topicBrokerPartitions += (topic -> numBrokerPartitions)
+  def getBrokerPartitionInfo(topic: String): SortedSet[Partition] = {
+    zkWatcherLock synchronized {
+      val brokerPartitions = topicBrokerPartitions.get(topic)
+      var numBrokerPartitions = SortedSet.empty[Partition]
+      brokerPartitions match {
+        case Some(bp) =>
+          bp.size match {
+            case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
+              numBrokerPartitions = bootstrapWithExistingBrokers(topic)
+              topicBrokerPartitions += (topic -> numBrokerPartitions)
+            case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp
+          }
+        case None =>  // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
+          numBrokerPartitions = bootstrapWithExistingBrokers(topic)
+          topicBrokerPartitions += (topic -> numBrokerPartitions)
+      }
+      numBrokerPartitions
     }
-    numBrokerPartitions
   }
 
   /**
@@ -113,7 +116,11 @@ private[producer] class ZKBrokerPartitio
    * @param brokerId the broker for which the info is to be returned
    * @return host and port of brokerId
    */
-  def getBrokerInfo(brokerId: Int): Option[Broker] =  allBrokers.get(brokerId)
+  def getBrokerInfo(brokerId: Int): Option[Broker] =  {
+    zkWatcherLock synchronized {
+      allBrokers.get(brokerId)
+    }
+  }
 
   /**
    * Generate a mapping from broker id to the host and port for all brokers
@@ -123,18 +130,28 @@ private[producer] class ZKBrokerPartitio
 
   def close = zkClient.close
 
+  def updateInfo = {
+    zkWatcherLock synchronized {
+      topicBrokerPartitions = getZKTopicPartitionInfo
+      allBrokers = getZKBrokerInfo
+    }
+  }
+
   private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = {
-    logger.debug("Currently, no brokers are registered under topic: " + topic)
-    logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " +
+    if(logger.isDebugEnabled) logger.debug("Currently, no brokers are registered under topic: " + topic)
+    if(logger.isDebugEnabled)
+      logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " +
       "number of partitions = 1")
     val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath)
-    logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString)
+    if(logger.isTraceEnabled)
+      logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString)
     // since we do not have the in formation about number of partitions on these brokers, just assume single partition
     // i.e. pick partition 0 from each broker as a candidate
     val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0))
     // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers
     // participate in hosting this topic.
-    logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
+    if(logger.isDebugEnabled)
+      logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
     numBrokerPartitions
   }
 
@@ -154,7 +171,8 @@ private[producer] class ZKBrokerPartitio
       val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
       val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions)
       val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
-      logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
+      if(logger.isDebugEnabled)
+        logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
 
       var brokerParts = SortedSet.empty[Partition]
       sortedBrokerPartitions.foreach { bp =>
@@ -164,7 +182,8 @@ private[producer] class ZKBrokerPartitio
         }
       }
       brokerPartitionsPerTopic += (topic -> brokerParts)
-      logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString)
+      if(logger.isDebugEnabled)
+        logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString)
     }
     brokerPartitionsPerTopic
   }
@@ -195,26 +214,35 @@ private[producer] class ZKBrokerPartitio
     private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap
     private val logger = Logger.getLogger(classOf[BrokerTopicsListener])
 
-    logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" +
-    "/broker/topics, /broker/topics/topic, /broker/ids")
-    logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " +
+    if(logger.isDebugEnabled)
+      logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" +
+      "/broker/topics, /broker/topics/topic, /broker/ids")
+    if(logger.isDebugEnabled)
+      logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " +
       "partition id per topic with " + oldBrokerTopicPartitionsMap.toString)
 
     @throws(classOf[Exception])
-    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+    def handleChildChange(parentPath : String, currentChildren : java.util.List[String]) {
+      val curChilds: java.util.List[String] = if(currentChildren != null) currentChildren
+                                              else new java.util.ArrayList[String]()
+
       zkWatcherLock synchronized {
-        logger.trace("Watcher fired for path: " + parentPath)
+        if(logger.isTraceEnabled)
+          logger.trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString)
         import scala.collection.JavaConversions._
 
         parentPath match {
           case "/brokers/topics" =>        // this is a watcher for /broker/topics path
             val updatedTopics = asBuffer(curChilds)
-            logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
-              curChilds.toString)
-            logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
-            logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString)
+            if(logger.isDebugEnabled) {
+              logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
+                curChilds.toString)
+              logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
+              logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString)
+            }
             val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet
-            logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString)
+            if(logger.isDebugEnabled)
+              logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString)
             newTopics.foreach { topic =>
               val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
               val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
@@ -223,15 +251,17 @@ private[producer] class ZKBrokerPartitio
                 brokerTopicsListener)
             }
           case "/brokers/ids"    =>        // this is a watcher for /broker/ids path
-            logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
-              "\t Currently registered list of brokers -> " + curChilds.toString)
+            if(logger.isDebugEnabled)
+              logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
+                "\t Currently registered list of brokers -> " + curChilds.toString)
             processBrokerChange(parentPath, curChilds)
           case _ =>
             val pathSplits = parentPath.split("/")
             val topic = pathSplits.last
             if(pathSplits.length == 4 && pathSplits(2).equals("topics")) {
-              logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
-                " list of brokers -> " + curChilds.toString + " for topic -> " + topic)
+              if(logger.isDebugEnabled)
+                logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
+                  " list of brokers -> " + curChilds.toString + " for topic -> " + topic)
               processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
             }
         }
@@ -247,17 +277,18 @@ private[producer] class ZKBrokerPartitio
         import scala.collection.JavaConversions._
         val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt)
         val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet
-        logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
+        if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
         newBrokers.foreach { bid =>
           val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
           val brokerHostPort = brokerInfo.split(":")
           allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt))
-          logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid)
+          if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid)
           producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt)
         }
         // remove dead brokers from the in memory list of live brokers
         val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet
-        logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString)
+        if(logger.isDebugEnabled)
+          logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString)
         deadBrokers.foreach {bid =>
           allBrokers = allBrokers - bid
           // also remove this dead broker from particular topics
@@ -266,7 +297,8 @@ private[producer] class ZKBrokerPartitio
               case Some(oldBrokerPartitionList) =>
                 val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid)
                 topicBrokerPartitions += (topic -> aliveBrokerPartitionList)
-                logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " +
+                if(logger.isDebugEnabled)
+                  logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " +
                   "Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString)
               case None =>
             }
@@ -285,19 +317,23 @@ private[producer] class ZKBrokerPartitio
       // find the old list of brokers for this topic
       oldBrokerTopicPartitionsMap.get(topic) match {
         case Some(brokersParts) =>
-          logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString)
+          if(logger.isDebugEnabled)
+            logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString)
         case None =>
       }
+
       val updatedBrokerList = curChilds.map(b => b.toInt)
       import ZKBrokerPartitionInfo._
       val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList)
-      logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " +
-        curChilds.toString)
+      if(logger.isDebugEnabled)
+        logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " +
+          curChilds.toString)
       // update the number of partitions on existing brokers
       var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts
       topicBrokerPartitions.get(topic) match {
         case Some(oldBrokerParts) =>
-          logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " +
+          if(logger.isDebugEnabled)
+            logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " +
             oldBrokerParts.toString)
           mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts
         case None =>
@@ -305,16 +341,24 @@ private[producer] class ZKBrokerPartitio
       // keep only brokers that are alive
       mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId))
       topicBrokerPartitions += (topic -> mergedBrokerParts)
-      logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts.toString)
+      if(logger.isDebugEnabled)
+        logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " +
+          mergedBrokerParts.toString)
     }
 
     def resetState = {
-      logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString)
+      if(logger.isTraceEnabled)
+        logger.trace("[BrokerTopicsListener] Before reseting broker topic partitions state " +
+          oldBrokerTopicPartitionsMap.toString)
       oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions
-      logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString)
-      logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString)
+      if(logger.isDebugEnabled)
+        logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " +
+          oldBrokerTopicPartitionsMap.toString)
+      if(logger.isTraceEnabled)
+        logger.trace("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString)
       oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++  allBrokers
-      logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString)
+      if(logger.isDebugEnabled)
+        logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString)
     }
   }
 
@@ -322,7 +366,8 @@ private[producer] class ZKBrokerPartitio
    * Handles the session expiration event in zookeeper
    */
   class ZKSessionExpirationListener(val brokerTopicsListener: BrokerTopicsListener)
-          extends IZkStateListener {
+    extends IZkStateListener {
+
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
       // do nothing, since zkclient will do reconnect for us.
@@ -350,7 +395,7 @@ private[producer] class ZKBrokerPartitio
       // NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above,
       // it automatically recreates the watchers there itself
       topicBrokerPartitions.keySet.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic,
-                                                brokerTopicsListener))
+        brokerTopicsListener))
       // there is no need to re-register other listeners as they are listening on the child changes of
       // permanent nodes
     }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1178997&r1=1178996&r2=1178997&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Tue Oct  4 22:43:00 2011
@@ -169,10 +169,10 @@ class ProducerTest extends JUnitSuite {
     // 2 sync producers
     val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
     val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
-    // it should send to random partition on broker 1
+    // it should send to partition 0 due to the StaticPartitioner
     val messageList = new java.util.ArrayList[Message]
     messageList.add(new Message("t".getBytes()))
-    syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+    syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
     EasyMock.expectLastCall
     syncProducer1.close
     EasyMock.expectLastCall
@@ -368,7 +368,7 @@ class ProducerTest extends JUnitSuite {
     val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
     val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
     // it should send to partition 0 (first partition) on second broker i.e broker2
-    asyncProducer1.send(topic, "test1", -1)
+    asyncProducer1.send(topic, "test1", 0)
     EasyMock.expectLastCall
     asyncProducer1.close
     EasyMock.expectLastCall
@@ -554,7 +554,6 @@ class ProducerTest extends JUnitSuite {
     val messageList = new java.util.ArrayList[Message]
     messageList.add(new Message("test".getBytes()))
     tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
-
     Thread.sleep(500)
 
     val messagesContent = new java.util.ArrayList[String]
@@ -585,7 +584,7 @@ class ProducerTest extends JUnitSuite {
     val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
     val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
     // it should send to partition 0 (first partition) on second broker i.e broker2
-    asyncProducer1.send(topic, "test1", -1)
+    asyncProducer1.send(topic, "test1", 0)
     EasyMock.expectLastCall
     asyncProducer1.close
     EasyMock.expectLastCall

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1178997&r1=1178996&r2=1178997&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Oct  4 22:43:00 2011
@@ -155,8 +155,8 @@ class ProducerTest extends JUnitSuite {
     // 2 sync producers
     val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
     val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
-    // it should send to random partition on broker 1
-    syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
+    // it should send to partition 0 due to the StaticPartitioner
+    syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
     EasyMock.expectLastCall
     syncProducer1.close
     EasyMock.expectLastCall
@@ -374,7 +374,7 @@ class ProducerTest extends JUnitSuite {
     val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
     val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
     // it should send to partition 0 (first partition) on second broker i.e broker2
-    asyncProducer1.send(topic, "test1", -1)
+    asyncProducer1.send(topic, "test1", 0)
     EasyMock.expectLastCall
     asyncProducer1.close
     EasyMock.expectLastCall
@@ -610,9 +610,9 @@ class ProducerTest extends JUnitSuite {
     val serverConfig = new KafkaConfig(serverProps) {
       override val numPartitions = 4
     }
+
     val server3 = TestUtils.createServer(serverConfig)
     Thread.sleep(500)
-
     // send a message to the new broker to register it under topic "test-topic"
     val tempProps = new Properties()
     tempProps.put("host", "localhost")
@@ -622,7 +622,6 @@ class ProducerTest extends JUnitSuite {
                                                              messages = new Message("test".getBytes())))
 
     Thread.sleep(500)
-
     producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1")))
     producer.close
 
@@ -648,7 +647,7 @@ class ProducerTest extends JUnitSuite {
     val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
     val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
     // it should send to partition 0 (first partition) on second broker i.e broker2
-    asyncProducer1.send(topic, "test1", -1)
+    asyncProducer1.send(topic, "test1", 0)
     EasyMock.expectLastCall
     asyncProducer1.close
     EasyMock.expectLastCall