You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/07/24 20:13:04 UTC

svn commit: r1365199 [2/3] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/sca...

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Tue Jul 24 18:13:01 2012
@@ -50,7 +50,8 @@ class DefaultEventHandler[K,V](config: P
         if (outstandingProduceRequests.size > 0)  {
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.producerRetryBackoffMs)
-          Utils.swallowError(brokerPartitionInfo.updateInfo())
+          // get topics of the outstanding produce requests and refresh metadata for those
+          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))
           remainingRetries -= 1
         }
       }
@@ -62,66 +63,77 @@ class DefaultEventHandler[K,V](config: P
   }
 
   private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = {
-    val partitionedData = partitionAndCollate(messages)
-    val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
-    try {
-      for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
-        if (logger.isTraceEnabled)
-          eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
-            .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
-        val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
-
-        val failedTopicPartitions = send(brokerid, messageSetPerBroker)
-        for( (topic, partition) <- failedTopicPartitions ) {
-          eventsPerBrokerMap.get((topic, partition)) match {
-            case Some(data) => failedProduceRequests.appendAll(data)
-            case None => // nothing
+    val partitionedDataOpt = partitionAndCollate(messages)
+    partitionedDataOpt match {
+      case Some(partitionedData) =>
+        val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
+        try {
+          for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
+            if (logger.isTraceEnabled)
+              eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
+                .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+            val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
+
+            val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+            for( (topic, partition) <- failedTopicPartitions ) {
+              eventsPerBrokerMap.get((topic, partition)) match {
+                case Some(data) => failedProduceRequests.appendAll(data)
+                case None => // nothing
+              }
+            }
           }
+        } catch {
+          case t: Throwable => error("Failed to send messages")
         }
-      }
-    } catch {
-      case t: Throwable => error("Failed to send messages")
+        failedProduceRequests
+      case None => // all produce requests failed
+        messages
     }
-    failedProduceRequests
   }
 
   def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
     events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m))))
   }
 
-  def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] = {
+  def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = {
     val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]
-    for (event <- events) {
-      val topicPartitionsList = getPartitionListForTopic(event)
-      val totalNumPartitions = topicPartitionsList.length
-
-      val partitionIndex = getPartition(event.getKey, totalNumPartitions)
-      val brokerPartition = topicPartitionsList(partitionIndex)
-
-      // postpone the failure until the send operation, so that requests for other brokers are handled correctly
-      val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
-
-      var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
-      ret.get(leaderBrokerId) match {
-        case Some(element) =>
-          dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
-        case None =>
-          dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
-          ret.put(leaderBrokerId, dataPerBroker)
-      }
+    try {
+      for (event <- events) {
+        val topicPartitionsList = getPartitionListForTopic(event)
+        val totalNumPartitions = topicPartitionsList.length
+
+        val partitionIndex = getPartition(event.getKey, totalNumPartitions)
+        val brokerPartition = topicPartitionsList(partitionIndex)
+
+        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
+        val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
+
+        var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
+        ret.get(leaderBrokerId) match {
+          case Some(element) =>
+            dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
+          case None =>
+            dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
+            ret.put(leaderBrokerId, dataPerBroker)
+        }
 
-      val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
-      var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
-      dataPerBroker.get(topicAndPartition) match {
-        case Some(element) =>
-          dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
-        case None =>
-          dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
-          dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+        val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
+        var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
+        dataPerBroker.get(topicAndPartition) match {
+          case Some(element) =>
+            dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
+          case None =>
+            dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
+            dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+        }
+        dataPerTopicPartition.append(event)
       }
-      dataPerTopicPartition.append(event)
+      Some(ret)
+    }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
+      case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None
+      case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None
+      case oe => error("Failed to collate messages by topic, partition due to", oe); throw oe
     }
-    ret
   }
 
   private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
@@ -144,12 +156,12 @@ class DefaultEventHandler[K,V](config: P
   private def getPartition(key: K, numPartitions: Int): Int = {
     if(numPartitions <= 0)
       throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
-              "\n Valid values are > 0")
+        "\n Valid values are > 0")
     val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
-                    else partitioner.partition(key, numPartitions)
+    else partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new InvalidPartitionException("Invalid partition id : " + partition +
-              "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
+        "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
     partition
   }
 
@@ -171,7 +183,8 @@ class DefaultEventHandler[K,V](config: P
         partitionData.append(new PartitionData(partitionId, messagesSet))
       }
       val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
-      val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeoutMs, topicData)
+      val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
+        config.requestTimeoutMs, topicData)
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)
@@ -204,46 +217,43 @@ class DefaultEventHandler[K,V](config: P
      */
 
     val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e =>
-      {
-        val topicAndPartition = e._1
-        val produceData = e._2
-        val messages = new ListBuffer[Message]
-        produceData.map(p => messages.appendAll(p.getData))
-
-        ( topicAndPartition,
-          config.compressionCodec match {
-            case NoCompressionCodec =>
-              trace("Sending %d messages with no compression to topic %s on partition %d"
-                  .format(messages.size, topicAndPartition._1, topicAndPartition._2))
-              new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
-            case _ =>
-              config.compressedTopics.size match {
-                case 0 =>
+      val topicAndPartition = e._1
+      val produceData = e._2
+      val messages = new ListBuffer[Message]
+      produceData.map(p => messages.appendAll(p.getData))
+      ( topicAndPartition,
+        config.compressionCodec match {
+          case NoCompressionCodec =>
+            trace("Sending %d messages with no compression to topic %s on partition %d"
+              .format(messages.size, topicAndPartition._1, topicAndPartition._2))
+            new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+          case _ =>
+            config.compressedTopics.size match {
+              case 0 =>
+                trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+                  .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+                new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+              case _ =>
+                if(config.compressedTopics.contains(topicAndPartition._1)) {
                   trace("Sending %d messages with compression codec %d to topic %s on partition %d"
-                      .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+                    .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
                   new ByteBufferMessageSet(config.compressionCodec, messages: _*)
-                case _ =>
-                  if(config.compressedTopics.contains(topicAndPartition._1)) {
-                    trace("Sending %d messages with compression codec %d to topic %s on partition %d"
-                        .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
-                    new ByteBufferMessageSet(config.compressionCodec, messages: _*)
-                  }
-                  else {
-                    trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
-                        .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
-                        config.compressedTopics.toString))
-                    new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
-                  }
-              }
-          }
+                }
+                else {
+                  trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
+                    .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
+                    config.compressedTopics.toString))
+                  new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+                }
+            }
+        }
         )
-      }
     }
     messagesPerTopicPartition
   }
 
   def close() {
     if (producerPool != null)
-      producerPool.close    
+      producerPool.close
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Jul 24 18:13:01 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -30,7 +30,6 @@ import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
 import scala.math._
-import java.lang.IllegalStateException
 import kafka.network.RequestChannel.Response
 
 /**
@@ -48,8 +47,8 @@ class KafkaApis(val requestChannel: Requ
   /**
    * Top-level method that handles all requests and multiplexes to the right api
    */
-  def handle(request: RequestChannel.Request) { 
-    val apiId = request.request.buffer.getShort() 
+  def handle(request: RequestChannel.Request) {
+    val apiId = request.request.buffer.getShort()
     apiId match {
       case RequestKeys.Produce => handleProducerRequest(request)
       case RequestKeys.Fetch => handleFetchRequest(request)
@@ -57,7 +56,7 @@ class KafkaApis(val requestChannel: Requ
       case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
       case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request)
       case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request)
-      case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
+      case _ => throw new KafkaException("No mapping found for handler id " + apiId)
     }
   }
 
@@ -113,6 +112,7 @@ class KafkaApis(val requestChannel: Requ
     val sTime = SystemTime.milliseconds
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Producer request " + request.toString)
+    trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
 
     val response = produceToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
@@ -172,7 +172,7 @@ class KafkaApis(val requestChannel: Requ
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
           replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition)
-          offsets(msgIndex) = log.nextAppendOffset
+          offsets(msgIndex) = log.logEndOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
         } catch {
@@ -201,7 +201,7 @@ class KafkaApis(val requestChannel: Requ
     val fetchRequest = FetchRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Fetch request " + fetchRequest.toString)
-
+    trace("Broker %s received fetch request %s".format(logManager.config.brokerId, fetchRequest.toString))
     // validate the request
     try {
       fetchRequest.validate()
@@ -243,7 +243,7 @@ class KafkaApis(val requestChannel: Requ
       fetchRequestPurgatory.watch(delayedFetch)
     }
   }
-    
+
   /**
    * Calculate the number of available bytes for the given fetch request
    */
@@ -255,7 +255,7 @@ class KafkaApis(val requestChannel: Requ
           debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
           val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
           val available = maybeLog match {
-            case Some(log) => max(0, log.highwaterMark - offsetDetail.offsets(i))
+            case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i))
             case None => 0
           }
           totalBytes += math.min(offsetDetail.fetchSizes(i), available)
@@ -319,6 +319,8 @@ class KafkaApis(val requestChannel: Requ
                 val replica = replicaOpt.get
                 debug("Leader %d for topic %s partition %d received fetch request from follower %d"
                   .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
+                  .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
             }
         }
@@ -382,36 +384,47 @@ class KafkaApis(val requestChannel: Requ
 
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val zkClient = kafkaZookeeper.getZookeeperClient
-    val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+    var errorCode = ErrorMapping.NoError
+    val config = logManager.config
 
-    metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
-      val topic = topicAndMetadata._1
-      topicAndMetadata._2 match {
-        case Some(metadata) => topicsMetadata += metadata
-        case None =>
-          /* check if auto creation of topics is turned on */
-          val config = logManager.config
-          if(config.autoCreateTopics) {
-            CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-            info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-              .format(topic, config.numPartitions, config.defaultReplicationFactor))
-            val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-            newTopicMetadata match {
-              case Some(topicMetadata) => topicsMetadata += topicMetadata
-              case None =>
-                throw new IllegalStateException("Topic metadata for automatically created topic %s does not exist".format(topic))
+    try {
+      val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+
+      metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
+        val topic = topicAndMetadata._1
+        topicAndMetadata._2.errorCode match {
+          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
+          case ErrorMapping.UnknownTopicCode =>
+            /* check if auto creation of topics is turned on */
+            if(config.autoCreateTopics) {
+              CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                .format(topic, config.numPartitions, config.defaultReplicationFactor))
+              val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+              newTopicMetadata.errorCode match {
+                case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
+                case _ =>
+                  throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
+              }
             }
-          }
+          case _ => error("Error while fetching topic metadata for topic " + topic,
+            ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
+        }
       }
+    }catch {
+      case e => error("Error while retrieving topic metadata", e)
+        // convert exception type to error code
+        errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
     }
-    val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
+    topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
+    val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
   def close() {
     fetchRequestPurgatory.shutdown()
   }
-  
+
   /**
    * A delayed fetch request
    */
@@ -423,7 +436,7 @@ class KafkaApis(val requestChannel: Requ
    * A holding pen for fetch requests waiting to be satisfied
    */
   class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData] {
-    
+
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
@@ -432,7 +445,7 @@ class KafkaApis(val requestChannel: Requ
       val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
-    
+
     /**
      * When a request expires just answer it with whatever data is present
      */
@@ -522,7 +535,7 @@ class KafkaApis(val requestChannel: Requ
                 numAcks, produce.requiredAcks,
                 topic, partitionId))
               if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
-                  (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
+                (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
                 /*
                  * requiredAcks < 0 means acknowledge after all replicas in ISR
                  * are fully caught up to the (local) leader's offset
@@ -563,7 +576,7 @@ class KafkaApis(val requestChannel: Requ
 
       override def toString =
         "acksPending:%b, error: %d, requiredOffset: %d".format(
-        acksPending, error, requiredOffset
+          acksPending, error, requiredOffset
         )
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Tue Jul 24 18:13:01 2012
@@ -113,9 +113,9 @@ class KafkaConfig(props: Properties) ext
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
 
-  val keepInSyncTimeMs = Utils.getLong(props, "isr.in.sync.time.ms", 30000)
+  val replicaMaxLagTimeMs = Utils.getLong(props, "replica.max.lag.time.ms", 10000)
 
-  val keepInSyncBytes = Utils.getLong(props, "isr.in.sync.bytes", 4000)
+  val replicaMaxLagBytes = Utils.getLong(props, "replica.max.lag.bytes", 4000)
 
   /* size of the state change request queue in Zookeeper */
   val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Tue Jul 24 18:13:01 2012
@@ -134,11 +134,15 @@ class ControllerChannelManager(allBroker
 
   def removeBroker(brokerId: Int){
     brokers.remove(brokerId)
-    messageChannels(brokerId).disconnect()
-    messageChannels.remove(brokerId)
-    messageQueues.remove(brokerId)
-    messageThreads(brokerId).shutDown()
-    messageThreads.remove(brokerId)
+    try {
+      messageChannels(brokerId).disconnect()
+      messageChannels.remove(brokerId)
+      messageQueues.remove(brokerId)
+      messageThreads(brokerId).shutDown()
+      messageThreads.remove(brokerId)
+    }catch {
+      case e => error("Error while removing broker by the controller", e)
+    }
   }
 }
 
@@ -189,10 +193,10 @@ class KafkaController(config : KafkaConf
   }
 
   def shutDown() = {
-    if(controllerChannelManager != null){
+    if(controllerChannelManager != null)
       controllerChannelManager.shutDown()
-    }
-    zkClient.close()
+    if(zkClient != null)
+      zkClient.close()
   }
 
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Jul 24 18:13:01 2012
@@ -25,6 +25,7 @@ import java.util.concurrent._
 import atomic.AtomicBoolean
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.ZkClient
+import kafka.common.KafkaZookeeperClient
 
 
 /**
@@ -44,6 +45,7 @@ class KafkaServer(val config: KafkaConfi
   private var replicaManager: ReplicaManager = null
   private var apis: KafkaApis = null
   var kafkaController: KafkaController = new KafkaController(config)
+  var zkClient: ZkClient = null
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
@@ -59,6 +61,9 @@ class KafkaServer(val config: KafkaConfi
       needRecovery = false
       cleanShutDownFile.delete
     }
+    /* start client */
+    info("connecting to ZK: " + config.zkConnect)
+    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
     logManager = new LogManager(config,
                                 SystemTime,
                                 1000L * 60 * config.logCleanupIntervalMinutes,
@@ -73,9 +78,9 @@ class KafkaServer(val config: KafkaConfi
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, makeLeader, makeFollower)
+    kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
 
-    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient)
+    replicaManager = new ReplicaManager(config, time, zkClient)
 
     apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
     requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
@@ -112,6 +117,8 @@ class KafkaServer(val config: KafkaConfi
         kafkaController.shutDown()
 
       kafkaZookeeper.close
+      info("Closing zookeeper client...")
+      zkClient.close()
 
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Tue Jul 24 18:13:01 2012
@@ -23,7 +23,7 @@ import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
 import kafka.admin.AdminUtils
-import java.lang.{Thread, IllegalStateException}
+import java.lang.Thread
 import collection.mutable.HashSet
 import kafka.common._
 
@@ -34,13 +34,13 @@ import kafka.common._
  *
  */
 class KafkaZooKeeper(config: KafkaConfig,
+                     zkClient: ZkClient,
                      addReplicaCbk: (String, Int, Set[Int]) => Replica,
                      getReplicaCbk: (String, Int) => Option[Replica],
                      becomeLeader: (Replica, Seq[Int]) => Unit,
                      becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
-  private var zkClient: ZkClient = null
   private var leaderChangeListener: LeaderChangeListener = null
   private var topicPartitionsChangeListener: TopicChangeListener = null
   private var stateChangeHandler: StateChangeCommandHandler = null
@@ -49,9 +49,8 @@ class KafkaZooKeeper(config: KafkaConfig
   private val leaderChangeLock = new Object
 
   def startup() {
-    /* start client */
-    info("connecting to ZK: " + config.zkConnect)
-    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    leaderChangeListener = new LeaderChangeListener
+    topicPartitionsChangeListener = new TopicChangeListener
     leaderChangeListener = new LeaderChangeListener
     topicPartitionsChangeListener = new TopicChangeListener
     startStateChangeCommandHandler()
@@ -106,11 +105,7 @@ class KafkaZooKeeper(config: KafkaConfig
   }
 
   def close() {
-    if (zkClient != null) {
-      stateChangeHandler.shutdown()
-      info("Closing zookeeper client...")
-      zkClient.close()
-    }
+    stateChangeHandler.shutdown()
   }
 
   def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
@@ -122,10 +117,10 @@ class KafkaZooKeeper(config: KafkaConfig
     ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
       case Some(leader) =>
         if(leader != config.brokerId)
-          throw new NotLeaderForPartitionException("Broker %d is not leader for partition %d for topic %s"
+          throw new LeaderNotAvailableException("Broker %d is not leader for partition %d for topic %s"
             .format(config.brokerId, partition, topic))
       case None =>
-        throw new NoLeaderForPartitionException("There is no leader for topic %s partition %d".format(topic, partition))
+        throw new LeaderNotAvailableException("There is no leader for topic %s partition %d".format(topic, partition))
     }
   }
 
@@ -345,7 +340,8 @@ class KafkaZooKeeper(config: KafkaConfig
       val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
       val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
       val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
-      debug("Leader change listener fired for path %s. New leader is %d. New epoch is %d".format(dataPath, newLeader, newEpoch))
+      debug("Leader change listener fired on broker %d for path %s. New leader is %d. New epoch is %d".format(config.brokerId,
+        dataPath, newLeader, newEpoch))
       val topicPartitionInfo = dataPath.split("/")
       val topic = topicPartitionInfo.takeRight(4).head
       val partition = topicPartitionInfo.takeRight(2).head.toInt

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Tue Jul 24 18:13:01 2012
@@ -22,10 +22,10 @@ import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
 
 class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
-        extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
-          socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
-          fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
-          minBytes = brokerConfig.replicaMinBytes) {
+  extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
+    socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
+    fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
+    minBytes = brokerConfig.replicaMinBytes) {
 
   // process fetched data
   def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) {
@@ -35,10 +35,15 @@ class ReplicaFetcherThread(name:String, 
 
     if (fetchOffset != replica.logEndOffset())
       throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset()))
+    trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId,
+      replica.logEndOffset(), messageSet.sizeInBytes, partitionData.hw))
     replica.log.get.append(messageSet)
-    replica.highWatermark(Some(partitionData.hw))
-    trace("follower %d set replica highwatermark for topic %s partition %d to %d"
-          .format(replica.brokerId, topic, partitionId, partitionData.hw))
+    trace("Follower %d has replica log end offset %d after appending %d messages"
+      .format(replica.brokerId, replica.logEndOffset(), messageSet.sizeInBytes))
+    val followerHighWatermark = replica.logEndOffset().min(partitionData.hw)
+    replica.highWatermark(Some(followerHighWatermark))
+    trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+      .format(replica.brokerId, topic, partitionId, followerHighWatermark))
   }
 
   // handle a partition whose offset is out of range and return a new fetch offset

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue Jul 24 18:13:01 2012
@@ -19,12 +19,11 @@ package kafka.server
 import kafka.log.Log
 import kafka.cluster.{Partition, Replica}
 import collection.mutable
-import java.lang.IllegalStateException
 import mutable.ListBuffer
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
-import kafka.common.InvalidPartitionException
+import kafka.common.{KafkaException, InvalidPartitionException}
 
 class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
 
@@ -36,7 +35,7 @@ class ReplicaManager(val config: KafkaCo
 
   // start ISR expiration thread
   isrExpirationScheduler.startUp
-  isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
+  isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.replicaMaxLagTimeMs)
 
   def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
     val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
@@ -112,7 +111,7 @@ class ReplicaManager(val config: KafkaCo
       case Some(replicas) =>
         Some(replicas.leaderReplica())
       case None =>
-        throw new IllegalStateException("Getting leader replica failed. Partition replica metadata for topic " +
+        throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
       "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
     }
   }
@@ -136,15 +135,17 @@ class ReplicaManager(val config: KafkaCo
     if(newHw > oldHw) {
       debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
       partition.leaderHW(Some(newHw))
-    }
+    }else
+      debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic,
+        replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
   }
 
   def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
-    // stop replica fetcher thread, if any
-    replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
     // read and cache the ISR
     replica.partition.leaderId(Some(replica.brokerId))
     replica.partition.updateISR(currentISRInZk.toSet)
+    // stop replica fetcher thread, if any
+    replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
     // also add this partition to the list of partitions for which the leader is the current broker
     try {
       leaderReplicaLock.lock()
@@ -157,6 +158,8 @@ class ReplicaManager(val config: KafkaCo
   def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
     info("broker %d intending to follow leader %d for topic %s partition %d"
       .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+    // set the leader for this partition correctly on this broker
+    replica.partition.leaderId(Some(leaderBrokerId))
     // remove this replica's partition from the ISR expiration queue
     try {
       leaderReplicaLock.lock()
@@ -186,12 +189,11 @@ class ReplicaManager(val config: KafkaCo
   def maybeShrinkISR(): Unit = {
     try {
       info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
-        .format(config.keepInSyncTimeMs))
-
+        .format(config.replicaMaxLagTimeMs))
       leaderReplicaLock.lock()
       leaderReplicas.foreach { partition =>
          // shrink ISR if a follower is slow or stuck
-        val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.keepInSyncTimeMs, config.keepInSyncBytes)
+        val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
         if(outOfSyncReplicas.size > 0) {
           val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
           assert(newInSyncReplicas.size > 0)
@@ -215,7 +217,7 @@ class ReplicaManager(val config: KafkaCo
       val leaderHW = partition.leaderHW()
       replica.logEndOffset() >= leaderHW
     }
-    else throw new IllegalStateException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+    else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
       " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
   }
 
@@ -230,9 +232,10 @@ class ReplicaManager(val config: KafkaCo
           // update ISR in ZK and cache
           replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient))
         }
+        debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partition))
         maybeIncrementLeaderHW(replica)
       case None =>
-        throw new IllegalStateException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
+        throw new KafkaException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
     }
   }
 
@@ -242,12 +245,14 @@ class ReplicaManager(val config: KafkaCo
       case Some(replica) =>
         replica.logEndOffsetUpdateTime(Some(time.milliseconds))
       case None =>
-        throw new IllegalStateException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
+        throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
     }
   }
 
   def close() {
+    info("Closing replica manager on broker " + config.brokerId)
     isrExpirationScheduler.shutdown()
     replicaFetcherManager.shutdown()
+    info("Replica manager shutdown on broker " + config.brokerId)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala Tue Jul 24 18:13:01 2012
@@ -18,7 +18,7 @@
 package kafka.server
 
 import util.parsing.json.JSON
-import java.lang.IllegalStateException
+import kafka.common.KafkaException
 import kafka.utils.{Utils, Logging}
 import collection.mutable.HashMap
 
@@ -45,10 +45,10 @@ object StateChangeCommand extends Loggin
               request match {
                 case StartReplica => new StartReplica(topic, partition, epoch)
                 case CloseReplica => new CloseReplica(topic, partition, epoch)
-                case _ => throw new IllegalStateException("Unknown state change request " + request)
+                case _ => throw new KafkaException("Unknown state change request " + request)
               }
             case None =>
-              throw new IllegalStateException("Illegal state change request JSON " + requestJson)
+              throw new KafkaException("Illegal state change request JSON " + requestJson)
           }
         case None => throw new RuntimeException("Error parsing state change request : " + requestJson)
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala Tue Jul 24 18:13:01 2012
@@ -17,6 +17,9 @@
 
 package kafka.utils
 
+import kafka.common.KafkaException
+import java.lang.IllegalStateException
+
 class State
 object DONE extends State
 object READY extends State

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Tue Jul 24 18:13:01 2012
@@ -19,7 +19,6 @@ package kafka.utils
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.lang.IllegalStateException
 
 /**
  * A scheduler for running jobs in the background

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Tue Jul 24 18:13:01 2012
@@ -20,7 +20,7 @@ package kafka.utils
 import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.{SimpleConsumer, ConsumerConfig}
 import kafka.api.OffsetRequest
-import java.lang.IllegalStateException
+import kafka.common.KafkaException
 
 /**
  *  A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
@@ -58,13 +58,13 @@ object UpdateOffsetsInZK {
 
       val broker = brokerHostingPartition match {
         case Some(b) => b
-        case None => throw new IllegalStateException("Broker " + brokerHostingPartition + " is unavailable. Cannot issue " +
+        case None => throw new KafkaException("Broker " + brokerHostingPartition + " is unavailable. Cannot issue " +
           "getOffsetsBefore request")
       }
 
       val brokerInfos = ZkUtils.getBrokerInfoFromIds(zkClient, List(broker))
       if(brokerInfos.size == 0)
-        throw new IllegalStateException("Broker information for broker id %d does not exist in ZK".format(broker))
+        throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
 
       val brokerInfo = brokerInfos.head
       val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Tue Jul 24 18:13:01 2012
@@ -30,6 +30,7 @@ import kafka.message.{NoCompressionCodec
 import org.I0Itec.zkclient.ZkClient
 import java.util.{Random, Properties}
 import joptsimple.{OptionSpec, OptionSet, OptionParser}
+import kafka.common.KafkaException
 
 
 /**
@@ -145,7 +146,7 @@ object Utils extends Logging {
     if(string == null) {
       buffer.putShort(-1)
     } else if(string.length > Short.MaxValue) {
-      throw new IllegalArgumentException("String exceeds the maximum size of " + Short.MaxValue + ".")
+      throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
     } else {
       buffer.putShort(string.length.asInstanceOf[Short])
       buffer.put(string.getBytes(encoding))
@@ -163,7 +164,7 @@ object Utils extends Logging {
     } else {
       val encodedString = string.getBytes(encoding)
       if(encodedString.length > Short.MaxValue) {
-        throw new IllegalArgumentException("String exceeds the maximum size of " + Short.MaxValue + ".")
+        throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
       } else {
         2 + encodedString.length
       }
@@ -188,7 +189,7 @@ object Utils extends Logging {
     if(props.containsKey(name))
       return getInt(props, name, -1)
     else
-      throw new IllegalArgumentException("Missing required property '" + name + "'")
+      throw new KafkaException("Missing required property '" + name + "'")
   }
   
   /**
@@ -211,7 +212,7 @@ object Utils extends Logging {
    * @param name The property name
    * @param default The default value to use if the property is not found
    * @param range The range in which the value must fall (inclusive)
-   * @throws IllegalArgumentException If the value is not in the given range
+   * @throws KafkaException If the value is not in the given range
    * @return the integer value
    */
   def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = {
@@ -221,7 +222,7 @@ object Utils extends Logging {
       else
         default
     if(v < range._1 || v > range._2)
-      throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+      throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
     else
       v
   }
@@ -233,7 +234,7 @@ object Utils extends Logging {
       else
         default
     if(v < range._1 || v > range._2)
-      throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+      throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
     else
       v
   }
@@ -241,21 +242,21 @@ object Utils extends Logging {
   def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
     val value = buffer.getInt
     if(value < range._1 || value > range._2)
-      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
     else value
   }
 
   def getShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = {
     val value = buffer.getShort
     if(value < range._1 || value > range._2)
-      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
     else value
   }
 
   def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
     val value = buffer.getLong
     if(value < range._1 || value > range._2)
-      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
     else value
   }
 
@@ -266,7 +267,7 @@ object Utils extends Logging {
     if(props.containsKey(name))
       return getLong(props, name, -1)
     else
-      throw new IllegalArgumentException("Missing required property '" + name + "'")
+      throw new KafkaException("Missing required property '" + name + "'")
   }
 
   /**
@@ -286,7 +287,7 @@ object Utils extends Logging {
    * @param name The property name
    * @param default The default value to use if the property is not found
    * @param range The range in which the value must fall (inclusive)
-   * @throws IllegalArgumentException If the value is not in the given range
+   * @throws KafkaException If the value is not in the given range
    * @return the long value
    */
   def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
@@ -296,7 +297,7 @@ object Utils extends Logging {
       else
         default
     if(v < range._1 || v > range._2)
-      throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+      throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
     else
       v
   }
@@ -316,7 +317,7 @@ object Utils extends Logging {
     else if("false" == props.getProperty(name))
       false
     else
-      throw new IllegalArgumentException("Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false" )
+      throw new KafkaException("Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false" )
   }
   
   /**
@@ -336,7 +337,7 @@ object Utils extends Logging {
     if(props.containsKey(name))
       props.getProperty(name)
     else
-      throw new IllegalArgumentException("Missing required property '" + name + "'")
+      throw new KafkaException("Missing required property '" + name + "'")
   }
 
   /**
@@ -350,13 +351,13 @@ object Utils extends Logging {
       for(i <- 0 until propValues.length) {
         val prop = propValues(i).split("=")
         if(prop.length != 2)
-          throw new IllegalArgumentException("Illegal format of specifying properties '" + propValues(i) + "'")
+          throw new KafkaException("Illegal format of specifying properties '" + propValues(i) + "'")
         properties.put(prop(0), prop(1))
       }
       properties
     }
     else
-      throw new IllegalArgumentException("Missing required property '" + name + "'")
+      throw new KafkaException("Missing required property '" + name + "'")
   }
 
   /**
@@ -367,12 +368,12 @@ object Utils extends Logging {
       val propString = props.getProperty(name)
       val propValues = propString.split(",")
       if(propValues.length < 1)
-        throw new IllegalArgumentException("Illegal format of specifying properties '" + propString + "'")
+        throw new KafkaException("Illegal format of specifying properties '" + propString + "'")
       val properties = new Properties
       for(i <- 0 until propValues.length) {
         val prop = propValues(i).split("=")
         if(prop.length != 2)
-          throw new IllegalArgumentException("Illegal format of specifying properties '" + propValues(i) + "'")
+          throw new KafkaException("Illegal format of specifying properties '" + propValues(i) + "'")
         properties.put(prop(0), prop(1))
       }
       properties
@@ -608,7 +609,7 @@ object Utils extends Logging {
   
   def notNull[V](v: V) = {
     if(v == null)
-      throw new IllegalArgumentException("Value cannot be null.")
+      throw new KafkaException("Value cannot be null.")
     else
       v
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Tue Jul 24 18:13:01 2012
@@ -18,7 +18,6 @@
 package kafka.utils
 
 import java.util.Properties
-import java.util.concurrent.locks.Condition
 import kafka.cluster.{Broker, Cluster}
 import kafka.common.NoEpochForPartitionException
 import kafka.consumer.TopicCount
@@ -27,6 +26,7 @@ import org.I0Itec.zkclient.exception.{Zk
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
 import util.parsing.json.JSON
+import java.util.concurrent.locks.{ReentrantLock, Condition}
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -481,18 +481,29 @@ object ZkUtils extends Logging {
 
 }
 
-class LeaderExists(topic: String, partition: Int, leaderExists: Condition) extends IZkDataListener {
+class LeaderExistsListener(topic: String, partition: Int, leaderLock: ReentrantLock, leaderExists: Condition) extends IZkDataListener {
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
     val t = dataPath.split("/").takeRight(3).head
     val p = dataPath.split("/").takeRight(2).head.toInt
-    if(t == topic && p == partition)
-      leaderExists.signal()
+    leaderLock.lock()
+    try {
+      if(t == topic && p == partition)
+        leaderExists.signal()
+    }
+    finally {
+      leaderLock.unlock()
+    }
   }
 
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
-    leaderExists.signal()
+    leaderLock.lock()
+    try {
+      leaderExists.signal()
+    }finally {
+      leaderLock.unlock()
+    }
   }
 
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Tue Jul 24 18:13:01 2012
@@ -20,6 +20,7 @@ import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
+import kafka.common.ErrorMapping
 import kafka.utils.TestUtils
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -137,13 +138,28 @@ class AdminTest extends JUnit3Suite with
       10 -> List("1", "2", "3"),
       11 -> List("1", "3", "4")
     )
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
-
+    val leaderForPartitionMap = Map(
+      0 -> 0,
+      1 -> 1,
+      2 -> 2,
+      3 -> 3,
+      4 -> 4,
+      5 -> 0,
+      6 -> 1,
+      7 -> 2,
+      8 -> 3,
+      9 -> 4,
+      10 -> 1,
+      11 -> 1
+    )
     val topic = "test"
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    // create leaders for all partitions
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
     val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-                                  .get.partitionsMetadata.map(p => p.replicas)
+                                  .partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for( i <- 0 until actualReplicaList.size ) {
@@ -165,23 +181,30 @@ class AdminTest extends JUnit3Suite with
       0 -> List("0", "1", "2"),
       1 -> List("1", "2", "3")
     )
+    val leaderForPartitionMap = Map(
+      0 -> 0,
+      1 -> 1
+    )
     val topic = "auto-topic"
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    // create leaders for all partitions
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
 
     val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-    newTopicMetadata match {
-      case Some(metadata) =>
-        assertEquals(topic, metadata.topic)
-        assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
-        assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
-        val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
+    newTopicMetadata.errorCode match {
+      case ErrorMapping.UnknownTopicCode =>
+        fail("Topic " + topic + " should've been automatically created")
+      case _ =>
+        assertEquals(topic, newTopicMetadata.topic)
+        assertNotNull("partition metadata list cannot be null", newTopicMetadata.partitionsMetadata)
+        assertEquals("partition metadata list length should be 2", 2, newTopicMetadata.partitionsMetadata.size)
+        val actualReplicaAssignment = newTopicMetadata.partitionsMetadata.map(p => p.replicas)
         val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
         assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
         for(i <- 0 until actualReplicaList.size) {
           assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
         }
-      case None => fail("Topic " + topic + " should've been automatically created")
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Tue Jul 24 18:13:01 2012
@@ -21,6 +21,7 @@ import kafka.server._
 import kafka.utils.{Utils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
+import kafka.common.KafkaException
 
 /**
  * A test harness that brings up some number of broker nodes
@@ -33,7 +34,7 @@ trait KafkaServerTestHarness extends JUn
   override def setUp() {
     super.setUp
     if(configs.size <= 0)
-      throw new IllegalArgumentException("Must suply at least one server config.")
+      throw new KafkaException("Must suply at least one server config.")
     servers = configs.map(TestUtils.createServer(_))
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Tue Jul 24 18:13:01 2012
@@ -18,7 +18,6 @@
 package kafka.integration
 
 import kafka.api.FetchRequestBuilder
-import kafka.common.OffsetOutOfRangeException
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
@@ -26,6 +25,7 @@ import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.producer.ProducerData
 import kafka.utils.TestUtils
+import kafka.common.{KafkaException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -41,7 +41,7 @@ class LazyInitProducerTest extends JUnit
   override def setUp() {
     super.setUp
     if(configs.size <= 0)
-      throw new IllegalArgumentException("Must suply at least one server config.")
+      throw new KafkaException("Must suply at least one server config.")
 
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)    
@@ -133,6 +133,8 @@ class LazyInitProducerTest extends JUnit
       produceList ::= new ProducerData[String, Message](topic, topic, set)
       builder.addFetch(topic, 0, 0, 10000)
     }
+    // wait until leader is elected
+    topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500))
     producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
@@ -157,6 +159,9 @@ class LazyInitProducerTest extends JUnit
       produceList ::= new ProducerData[String, Message](topic, topic, set)
       builder.addFetch(topic, 0, 0, 10000)
     }
+    // wait until leader is elected
+    topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1500))
+
     producer.send(produceList: _*)
 
     producer.send(produceList: _*)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Tue Jul 24 18:13:01 2012
@@ -25,11 +25,12 @@ import kafka.log.LogManager
 import junit.framework.Assert._
 import org.easymock.EasyMock
 import kafka.network._
-import kafka.api.{TopicMetaDataResponse, TopicMetadataRequest}
 import kafka.cluster.Broker
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
+import kafka.common.ErrorMapping
+import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
 
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -66,18 +67,38 @@ class TopicMetadataTest extends JUnit3Su
     // create topic
     val topic = "test"
     CreateTopicCommand.createTopic(zkClient, topic, 1)
-
-    mockLogManagerAndTestTopic(topic)
+    // set up leader for topic partition 0
+    val leaderForPartitionMap = Map(
+      0 -> configs.head.brokerId
+    )
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    val topicMetadata = mockLogManagerAndTestTopic(topic)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
+    assertEquals(1, partitionMetadata.head.replicas.size)
   }
 
   def testAutoCreateTopic {
     // auto create topic
     val topic = "test"
 
-    mockLogManagerAndTestTopic(topic)
+    val topicMetadata = mockLogManagerAndTestTopic(topic)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
+    assertEquals(0, partitionMetadata.head.replicas.size)
+    assertEquals(None, partitionMetadata.head.leader)
+    assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode)
   }
 
-  private def mockLogManagerAndTestTopic(topic: String) = {
+  private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
     // topic metadata request only requires 2 APIs from the log manager
     val logManager = EasyMock.createMock(classOf[LogManager])
     val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
@@ -109,16 +130,11 @@ class TopicMetadataTest extends JUnit3Su
     
     // check assertions
     val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(brokers, partitionMetadata.head.replicas)
-    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
 
     // verify the expected calls to log manager occurred in the right order
     EasyMock.verify(kafkaZookeeper)
     EasyMock.verify(receivedRequest)
+
+    topicMetadata
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Jul 24 18:13:01 2012
@@ -23,8 +23,8 @@ import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, TestUtils, Range}
-import kafka.common.OffsetOutOfRangeException
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.common.{KafkaException, OffsetOutOfRangeException}
 
 class LogTest extends JUnitSuite {
   
@@ -58,7 +58,7 @@ class LogTest extends JUnitSuite {
       new Log(logDir, 1024, 1000, false)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
-      case e: IllegalStateException => "This is good"
+      case e: KafkaException => "This is good"
     }
   }
 
@@ -157,14 +157,14 @@ class LogTest extends JUnitSuite {
     {
       // first test a log segment starting at 0
       val log = new Log(logDir, 100, 1000, false)
-      val curOffset = log.nextAppendOffset
+      val curOffset = log.logEndOffset
       assertEquals(curOffset, 0)
 
       // time goes by; the log file is deleted
       log.markDeletedWhile(_ => true)
 
       // we now have a new log; the starting offset of the new log should remain 0
-      assertEquals(curOffset, log.nextAppendOffset)
+      assertEquals(curOffset, log.logEndOffset)
     }
 
     {
@@ -174,12 +174,12 @@ class LogTest extends JUnitSuite {
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
 
-      val curOffset = log.nextAppendOffset
+      val curOffset = log.logEndOffset
       // time goes by; the log file is deleted
       log.markDeletedWhile(_ => true)
 
       // we now have a new log
-      assertEquals(curOffset, log.nextAppendOffset)
+      assertEquals(curOffset, log.logEndOffset)
 
       // time goes by; the log file (which is empty) is deleted again
       val deletedSegments = log.markDeletedWhile(_ => true)
@@ -188,7 +188,7 @@ class LogTest extends JUnitSuite {
       assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
 
       // we now have a new log
-      assertEquals(curOffset, log.nextAppendOffset)
+      assertEquals(curOffset, log.logEndOffset)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala Tue Jul 24 18:13:01 2012
@@ -20,6 +20,7 @@ package kafka.log
 import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnitSuite
+import kafka.common.KafkaException
 
 class SegmentListTest extends JUnitSuite {
 
@@ -74,7 +75,7 @@ class SegmentListTest extends JUnitSuite
         sl.truncLast(-1)
         fail("Attempt to truncate with illegal index should fail")
       }catch {
-        case e: IllegalArgumentException => // this is ok
+        case e: KafkaException => // this is ok
       }
       val deleted = sl.truncLast(4)
       assertEquals(hd, sl.view.iterator.toList)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Jul 24 18:13:01 2012
@@ -212,14 +212,14 @@ class AsyncProducerTest extends JUnit3Su
     topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))))
     val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]]
     topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
-    val expectedResult = Map(
+    val expectedResult = Some(Map(
         0 -> Map(
               ("topic1", 0) -> topic1Broker1Data,
               ("topic2", 0) -> topic2Broker1Data),
         1 -> Map(
               ("topic1", 1) -> topic1Broker2Data,
               ("topic2", 1) -> topic2Broker2Data)
-      )
+      ))
 
     val actualResult = handler.partitionAndCollate(producerDataList)
     assertEquals(expectedResult, actualResult)
@@ -356,10 +356,15 @@ class AsyncProducerTest extends JUnit3Su
     producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
     producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg3".getBytes)))
 
-    val partitionedData = handler.partitionAndCollate(producerDataList)
-    for ((brokerId, dataPerBroker) <- partitionedData) {
-      for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
-        assertTrue(partitionId == 0)
+    val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
+    partitionedDataOpt match {
+      case Some(partitionedData) =>
+        for ((brokerId, dataPerBroker) <- partitionedData) {
+          for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
+            assertTrue(partitionId == 0)
+        }
+      case None =>
+        fail("Failed to collate requests by topic, partition")
     }
     EasyMock.verify(producerPool)
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Jul 24 18:13:01 2012
@@ -92,7 +92,7 @@ class ProducerTest extends JUnit3Suite w
     props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props1.put("zk.connect", TestZKUtils.zookeeperConnect)
     props1.put("producer.request.required.acks", "2")
-    props1.put("producer.request.ack.timeout.ms", "-1")
+    props1.put("producer.request.timeout.ms", "1000")
 
     val props2 = new util.Properties()
     props2.putAll(props1)
@@ -155,7 +155,7 @@ class ProducerTest extends JUnit3Suite w
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("socket.timeout.ms", "2000")
+    props.put("producer.request.timeout.ms", "2000")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     // create topic
@@ -213,7 +213,7 @@ class ProducerTest extends JUnit3Suite w
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("socket.timeout.ms", String.valueOf(timeoutMs))
+    props.put("producer.request.timeout.ms", String.valueOf(timeoutMs))
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Tue Jul 24 18:13:01 2012
@@ -153,7 +153,7 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertEquals(messages.sizeInBytes, response2.offsets(0))
     Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
 
-    // the middle message should have been rejected because the topic does not exist
+    // the middle message should have been rejected because broker doesn't lead partition
     Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.errors(1))
     Assert.assertEquals(-1, response2.offsets(1))
   }
@@ -167,7 +167,7 @@ class SyncProducerTest extends JUnit3Sui
     props.put("host", "localhost")
     props.put("port", server.socketServer.port.toString)
     props.put("buffer.size", "102400")
-    props.put("socket.timeout.ms", String.valueOf(timeoutMs))
+    props.put("producer.request.timeout.ms", String.valueOf(timeoutMs))
     val producer = new SyncProducer(new SyncProducerConfig(props))
 
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Tue Jul 24 18:13:01 2012
@@ -30,8 +30,8 @@ class ISRExpirationTest extends JUnit3Su
 
   var topicPartitionISR: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
   val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
-    override val keepInSyncTimeMs = 100L
-    override val keepInSyncBytes = 10L
+    override val replicaMaxLagTimeMs = 100L
+    override val replicaMaxLagBytes = 10L
   })
   val topic = "foo"
 
@@ -53,7 +53,7 @@ class ISRExpirationTest extends JUnit3Su
     time.sleep(150)
     leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
 
-    var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
 
     // add all replicas back to the ISR
@@ -65,8 +65,8 @@ class ISRExpirationTest extends JUnit3Su
     (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
     time.sleep(150)
     // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
-    // pulled any data for > keepInSyncTimeMs ms. So it is stuck
-    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
+    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
     EasyMock.verify(log)
   }
@@ -87,7 +87,7 @@ class ISRExpirationTest extends JUnit3Su
     time.sleep(10)
     (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
 
-    val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
 
     EasyMock.verify(log)
@@ -137,10 +137,10 @@ class ISRExpirationTest extends JUnit3Su
       time.sleep(10)
       (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
 
-      val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+      val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
       assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
 
-      val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+      val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
       assertEquals("Replica 0 should be out of sync", Set(configs.last.brokerId), partition1OSR.map(_.brokerId))
 
       EasyMock.verify(log0)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Tue Jul 24 18:13:01 2012
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.server
 
@@ -51,7 +51,6 @@ class LeaderElectionTest extends JUnit3S
     val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
 
     servers ++= List(server1, server2)
-    try {
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -66,14 +65,13 @@ class LeaderElectionTest extends JUnit3S
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
     // kill the server hosting the preferred replica
-    servers.head.shutdown()
+    server1.shutdown()
 
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
-    Thread.sleep(zookeeper.tickTime)
-
+    val leaderPath = zkClient.getChildren(ZkUtils.getTopicPartitionPath(topic, "0"))
     // bring the preferred replica back
     servers.head.startup()
 
@@ -86,14 +84,9 @@ class LeaderElectionTest extends JUnit3S
 
     // test if the leader is the preferred replica
     assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
-    }catch {
-      case e => error("Error while running leader election test ", e)
-    } finally {
-      // shutdown the servers and delete data hosted on them
-      servers.map(server => server.shutdown())
-      servers.map(server => Utils.rm(server.config.logDir))
-
-    }
+    // shutdown the servers and delete data hosted on them
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
   }
 
   // Assuming leader election happens correctly, test if epoch changes as expected
@@ -105,27 +98,23 @@ class LeaderElectionTest extends JUnit3S
     // setup 2 brokers in ZK
     val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
 
-    try {
-      // create topic with 1 partition, 2 replicas, one on each broker
-      CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
-
-      var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
-      assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
-
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
-      newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
-      assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
-
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
-      newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
-      assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
-      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
-
-    }finally {
-      TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
-    }
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+    var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+    assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
+    assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
+
+    ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+    newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
+    assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
+    assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
+
+    ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+    newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+    assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
+    assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
 
+    TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
   }
 }
\ No newline at end of file