You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 00:51:01 UTC

svn commit: r1367811 [1/4] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/network/ main/sca...

Author: junrao
Date: Tue Jul 31 22:50:59 2012
New Revision: 1367811

URL: http://svn.apache.org/viewvc?rev=1367811&view=rev
Log:
revert commit to KAFKA-343 due to unit test failures

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
      - copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
      - copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
      - copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
      - copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
      - copied unchanged from r1367618, incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Tue Jul 31 22:50:59 2012
@@ -91,10 +91,10 @@ object AdminUtils extends Logging {
     topics.map { topic =>
       if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
         val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get
-        val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
+        val sortedPartitions = topicPartitionAssignment.toList.sortWith( (m1,m2) => m1._1.toInt < m2._1.toInt )
 
         val partitionMetadata = sortedPartitions.map { partitionMap =>
-          val partition = partitionMap._1
+          val partition = partitionMap._1.toInt
           val replicas = partitionMap._2
           val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
           val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala Tue Jul 31 22:50:59 2012
@@ -25,54 +25,39 @@ import collection.mutable.HashMap
 
 
 object LeaderAndISR {
-  val initialLeaderEpoch: Int = 0
-  val initialZKVersion: Int = 0
   def readFrom(buffer: ByteBuffer): LeaderAndISR = {
     val leader = buffer.getInt
     val leaderGenId = buffer.getInt
     val ISRString = Utils.readShortString(buffer, "UTF-8")
     val ISR = ISRString.split(",").map(_.toInt).toList
-    val zkVersion = buffer.getInt
+    val zkVersion = buffer.getLong
     new LeaderAndISR(leader, leaderGenId, ISR, zkVersion)
   }
 }
 
-case class LeaderAndISR(var leader: Int, var leaderEpoch: Int, var ISR: List[Int], var zkVersion: Int){
-  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndISR.initialLeaderEpoch, ISR, LeaderAndISR.initialZKVersion)
-
+case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Long){
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(leader)
-    buffer.putInt(leaderEpoch)
+    buffer.putInt(leaderEpoc)
     Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8")
-    buffer.putInt(zkVersion)
+    buffer.putLong(zkVersion)
   }
 
   def sizeInBytes(): Int = {
-    val size = 4 + 4 + (2 + ISR.mkString(",").length) + 4
+    val size = 4 + 4 + (2 + ISR.mkString(",").length) + 8
     size
   }
-
-  override def toString(): String = {
-    val jsonDataMap = new HashMap[String, String]
-    jsonDataMap.put("leader", leader.toString)
-    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
-    jsonDataMap.put("ISR", ISR.mkString(","))
-    Utils.stringMapToJsonString(jsonDataMap)
-  }
 }
 
 
 object LeaderAndISRRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
-  val IsInit: Boolean = true
-  val NotInit: Boolean = false
-  val DefaultAckTimeout: Int = 1000
 
   def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
-    val isInit = if(buffer.get() == 1.toByte) true else false
+    val isInit = buffer.get()
     val ackTimeoutMs = buffer.getInt
     val leaderAndISRRequestCount = buffer.getInt
     val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
@@ -91,18 +76,19 @@ object LeaderAndISRRequest {
 
 case class LeaderAndISRRequest (versionId: Short,
                                 clientId: String,
-                                isInit: Boolean,
+                                isInit: Byte,
                                 ackTimeoutMs: Int,
-                                leaderAndISRInfos: Map[(String, Int), LeaderAndISR])
+                                leaderAndISRInfos:
+                                Map[(String, Int), LeaderAndISR])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
-  def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
-    this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, LeaderAndISRRequest.DefaultAckTimeout, leaderAndISRInfos)
+  def this(isInit: Byte, ackTimeoutMs: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
+    this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeoutMs, leaderAndISRInfos)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
-    buffer.put(if(isInit) 1.toByte else 0.toByte)
+    buffer.put(isInit)
     buffer.putInt(ackTimeoutMs)
     buffer.putInt(leaderAndISRInfos.size)
     for((key, value) <- leaderAndISRInfos){

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Tue Jul 31 22:50:59 2012
@@ -26,7 +26,6 @@ import collection.mutable.Set
 object StopReplicaRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
-  val DefaultAckTimeout = 100
 
   def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
     val versionId = buffer.getShort
@@ -44,10 +43,10 @@ object StopReplicaRequest {
 case class StopReplicaRequest(versionId: Short,
                               clientId: String,
                               ackTimeoutMs: Int,
-                              stopReplicaSet: Set[(String, Int)])
-        extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
-  def this(stopReplicaSet: Set[(String, Int)]) = {
-    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
+                              stopReplicaSet: Set[(String, Int)]
+                                     ) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
+  def this(ackTimeoutMs: Int, stopReplicaSet: Set[(String, Int)]) = {
+    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeoutMs, stopReplicaSet)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -68,4 +67,4 @@ case class StopReplicaRequest(versionId:
     }
     size
   }
-}
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Tue Jul 31 22:50:59 2012
@@ -19,7 +19,7 @@ package kafka.cluster
 
 import kafka.utils.Utils._
 import java.nio.ByteBuffer
-import kafka.common.BrokerNotExistException
+import kafka.common.KafkaException
 
 /**
  * A Kafka broker
@@ -28,7 +28,7 @@ private[kafka] object Broker {
 
   def createBroker(id: Int, brokerInfoString: String): Broker = {
     if(brokerInfoString == null)
-      throw new BrokerNotExistException("Broker id %s does not exist".format(id))
+      throw new KafkaException("Broker id %s does not exist".format(id))
     val brokerInfo = brokerInfoString.split(":")
     new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Tue Jul 31 22:50:59 2012
@@ -16,12 +16,11 @@
  */
 package kafka.cluster
 
+import kafka.utils.{SystemTime, Time, Logging}
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils._
 import java.util.concurrent.locks.ReentrantLock
-import scala.collection._
-import kafka.utils.{ZkUtils, SystemTime, Time}
 import kafka.common.{KafkaException, LeaderNotAvailableException}
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.Logging
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@@ -120,8 +119,14 @@ class Partition(val topic: String,
   }
 
   def updateISR(newISR: Set[Int], zkClientOpt: Option[ZkClient] = None) {
-    try{
+    try {
       leaderISRUpdateLock.lock()
+      zkClientOpt match {
+        case Some(zkClient) =>
+          // update ISR in ZK
+          updateISRInZk(newISR, zkClient)
+        case None =>
+      }
       // update partition's ISR in cache
       inSyncReplicas = newISR.map {r =>
         getReplica(r) match {
@@ -130,22 +135,29 @@ class Partition(val topic: String,
         }
       }
       info("Updated ISR for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
-      if(zkClientOpt.isDefined){
-        val zkClient = zkClientOpt.get
-        val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId)
-        curLeaderAndISR match {
-          case None =>
-            throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId))
-          case Some(m) =>
-            m.ISR = newISR.toList
-            ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString)
-        }
-      }
-    } finally {
+    }catch {
+      case e => throw new KafkaException("Failed to update ISR for topic %s ".format(topic) +
+        "partition %d to %s".format(partitionId, newISR.mkString(",")), e)
+    }finally {
       leaderISRUpdateLock.unlock()
     }
   }
 
+  private def updateISRInZk(newISR: Set[Int], zkClient: ZkClient) = {
+    val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString))
+    if(replicaListAndEpochString == null) {
+      throw new LeaderNotAvailableException(("Illegal partition state. ISR cannot be updated for topic " +
+        "%s partition %d since leader and ISR does not exist in ZK".format(topic, partitionId)))
+    }
+    else {
+      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+      val epoch = replicasAndEpochInfo.last
+      updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString),
+        "%s;%s".format(newISR.mkString(","), epoch))
+      info("Updated ISR for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
+    }
+  }
+
   override def equals(that: Any): Boolean = {
     if(!(that.isInstanceOf[Partition]))
       return false
@@ -168,4 +180,4 @@ class Partition(val topic: String,
     partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
     partitionString.toString()
   }
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Tue Jul 31 22:50:59 2012
@@ -39,8 +39,7 @@ object ErrorMapping {
   val NotLeaderForPartitionCode : Short = 7
   val UnknownTopicCode : Short = 8
   val RequestTimedOutCode: Short = 9
-  val BrokerNotExistInZookeeperCode: Short = 10
-  val ReplicaNotAvailableCode: Short = 11
+  val ReplicaNotAvailableCode: Short = 10
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -52,9 +51,8 @@ object ErrorMapping {
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
-      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode,
-      classOf[BrokerNotExistException].asInstanceOf[Class[Throwable]] -> BrokerNotExistInZookeeperCode,
-      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
+      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
+      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Tue Jul 31 22:50:59 2012
@@ -88,7 +88,8 @@ class ConsumerFetcherManager(private val
 
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
-    new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, this)
+    new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+                              config, sourceBroker, this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Tue Jul 31 22:50:59 2012
@@ -66,7 +66,7 @@ private[kafka] object TopicCount extends
                           consumerId: String,
                           zkClient: ZkClient) : TopicCount = {
     val dirs = new ZKGroupDirs(group)
-    val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1
+    val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
     val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
     val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Tue Jul 31 22:50:59 2012
@@ -296,7 +296,7 @@ private[kafka] class ZookeeperConsumerCo
     try {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
       val znode = topicDirs.consumerOffsetDir + "/" + partitionId
-      val offsetString = readDataMaybeNull(zkClient, znode)._1
+      val offsetString = readDataMaybeNull(zkClient, znode)
       if (offsetString != null)
         return offsetString.toLong
       else
@@ -416,7 +416,7 @@ private[kafka] class ZookeeperConsumerCo
       }
     }
 
-    private def deletePartitionOwnershipFromZK(topic: String, partition: Int) {
+    private def deletePartitionOwnershipFromZK(topic: String, partition: String) {
       val topicDirs = new ZKGroupTopicDirs(group, topic)
       val znode = topicDirs.consumerOwnerDir + "/" + partition
       deletePath(zkClient, znode)
@@ -427,7 +427,7 @@ private[kafka] class ZookeeperConsumerCo
       info("Releasing partition ownership")
       for ((topic, infos) <- localTopicRegistry) {
         for(partition <- infos.keys)
-          deletePartitionOwnershipFromZK(topic, partition)
+          deletePartitionOwnershipFromZK(topic, partition.toString)
         localTopicRegistry.remove(topic)
       }
     }
@@ -484,7 +484,7 @@ private[kafka] class ZookeeperConsumerCo
 
       releasePartitionOwnership(topicRegistry)
 
-      var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]()
+      var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
       var currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
 
       for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
@@ -492,7 +492,7 @@ private[kafka] class ZookeeperConsumerCo
 
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         val curConsumers = consumersPerTopicMap.get(topic).get
-        var curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
+        var curPartitions: Seq[String] = partitionsPerTopicMap.get(topic).get
 
         val nPartsPerConsumer = curPartitions.size / curConsumers.size
         val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
@@ -602,13 +602,13 @@ private[kafka] class ZookeeperConsumerCo
       }
     }
 
-    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, Int), String]): Boolean = {
-      var successfullyOwnedPartitions : List[(String, Int)] = Nil
+    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+      var successfullyOwnedPartitions : List[(String, String)] = Nil
       val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
         val topic = partitionOwner._1._1
         val partition = partitionOwner._1._2
         val consumerThreadId = partitionOwner._2
-        val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
+        val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic,partition)
         try {
           createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
           info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
@@ -633,29 +633,29 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
-                                      topicDirs: ZKGroupTopicDirs, partition: Int,
+                                      topicDirs: ZKGroupTopicDirs, partition: String,
                                       topic: String, consumerThreadId: String) {
       val partTopicInfoMap = currentTopicRegistry.get(topic)
 
       // find the leader for this partition
-      val leaderOpt = getLeaderForPartition(zkClient, topic, partition)
+      val leaderOpt = getLeaderForPartition(zkClient, topic, partition.toInt)
       leaderOpt match {
-        case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s".
+        case None => throw new NoBrokersForPartitionException("No leader available for partition %s on topic %s".
           format(partition, topic))
-        case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l))
+        case Some(l) => debug("Leader for partition %s for topic %s is %d".format(partition, topic, l))
       }
       val leader = leaderOpt.get
 
       val znode = topicDirs.consumerOffsetDir + "/" + partition
-      val offsetString = readDataMaybeNull(zkClient, znode)._1
+      val offsetString = readDataMaybeNull(zkClient, znode)
       // If first time starting a consumer, set the initial offset based on the config
       var offset : Long = 0L
       if (offsetString == null)
         offset = config.autoOffsetReset match {
               case OffsetRequest.SmallestTimeString =>
-                  earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime)
+                  earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.EarliestTime)
               case OffsetRequest.LargestTimeString =>
-                  earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime)
+                  earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.LatestTime)
               case _ =>
                   throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
         }
@@ -666,12 +666,12 @@ private[kafka] class ZookeeperConsumerCo
       val fetchedOffset = new AtomicLong(offset)
       val partTopicInfo = new PartitionTopicInfo(topic,
                                                  leader,
-                                                 partition,
+                                                 partition.toInt,
                                                  queue,
                                                  consumedOffset,
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchSize))
-      partTopicInfoMap.put(partition, partTopicInfo)
+      partTopicInfoMap.put(partition.toInt, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -29,6 +29,7 @@ import kafka.common.{KafkaException, Inv
 
 object Log {
   val FileSuffix = ".kafka"
+  val hwFileName = "highwatermark"
 
   /**
    * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
@@ -114,8 +115,8 @@ class LogSegment(val file: File, val mes
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[kafka] class Log( val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean, brokerId: Int = 0) extends Logging {
-  this.logIdent = "Kafka Log on Broker " + brokerId + ", "
+private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean)
+  extends Logging {
 
   import kafka.log.Log._
 
@@ -125,7 +126,7 @@ private[kafka] class Log( val dir: File,
   /* The current number of unflushed messages appended to the write */
   private val unflushed = new AtomicInteger(0)
 
-  /* last time it was flushed */
+   /* last time it was flushed */
   private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
 
   /* The actual segments of the log */
@@ -190,7 +191,8 @@ private[kafka] class Log( val dir: File,
         val curr = segments.get(i)
         val next = segments.get(i+1)
         if(curr.start + curr.size != next.start)
-          throw new KafkaException("The following segments don't validate: " + curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
+          throw new KafkaException("The following segments don't validate: " +
+                  curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
       }
     }
   }
@@ -229,12 +231,13 @@ private[kafka] class Log( val dir: File,
     BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages)
     BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
     logStats.recordAppendedMessages(numberOfMessages)
-
+    
     // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
     val validByteBuffer = messages.getBuffer.duplicate()
     val messageSetValidBytes = messages.validBytes
     if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
-      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
+        " Message set cannot be appended to log. Possible causes are corrupted produce requests")
 
     validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
     val validMessages = new ByteBufferMessageSet(validByteBuffer)
@@ -344,11 +347,12 @@ private[kafka] class Log( val dir: File,
     if (unflushed.get == 0) return
 
     lock synchronized {
-      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " + System.currentTimeMillis)
+      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+          System.currentTimeMillis)
       segments.view.last.messageSet.flush()
       unflushed.set(0)
       lastflushedTime.set(System.currentTimeMillis)
-    }
+     }
   }
 
   def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
@@ -371,15 +375,15 @@ private[kafka] class Log( val dir: File,
       case OffsetRequest.EarliestTime =>
         startIndex = 0
       case _ =>
-        var isFound = false
-        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
-        startIndex = offsetTimeArray.length - 1
-        while (startIndex >= 0 && !isFound) {
-          if (offsetTimeArray(startIndex)._2 <= request.time)
-            isFound = true
-          else
-            startIndex -=1
-        }
+          var isFound = false
+          debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+          startIndex = offsetTimeArray.length - 1
+          while (startIndex >= 0 && !isFound) {
+            if (offsetTimeArray(startIndex)._2 <= request.time)
+              isFound = true
+            else
+              startIndex -=1
+          }
     }
 
     val retSize = request.maxNumOffsets.min(startIndex + 1)
@@ -404,13 +408,7 @@ private[kafka] class Log( val dir: File,
     }
   }
 
-
-  def deleteWholeLog():Unit = {
-    deleteSegments(segments.contents.get())
-    Utils.rm(dir)
-  }
-
-  /* Attempts to delete all provided segments from a log and returns how many it was able to */
+  /* Attemps to delete all provided segments from a log and returns how many it was able to */
   def deleteSegments(segments: Seq[LogSegment]): Int = {
     var total = 0
     for(segment <- segments) {
@@ -426,27 +424,30 @@ private[kafka] class Log( val dir: File,
   }
 
   def truncateTo(targetOffset: Long) {
-    // find the log segment that has this hw
-    val segmentToBeTruncated = segments.view.find(
-      segment => targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
-
-    segmentToBeTruncated match {
-      case Some(segment) =>
-        val truncatedSegmentIndex = segments.view.indexOf(segment)
-        segments.truncLast(truncatedSegmentIndex)
-        segment.truncateTo(targetOffset)
-        info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
-      case None =>
-        assert(targetOffset <= segments.view.last.absoluteEndOffset, "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
-        error("Cannot truncate log to %d since the log start offset is %d and end offset is %d".format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
-    }
+      // find the log segment that has this hw
+      val segmentToBeTruncated = segments.view.find(segment =>
+        targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
+
+      segmentToBeTruncated match {
+        case Some(segment) =>
+          val truncatedSegmentIndex = segments.view.indexOf(segment)
+          segments.truncLast(truncatedSegmentIndex)
+          segment.truncateTo(targetOffset)
+          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
+        case None =>
+          assert(targetOffset <= segments.view.last.absoluteEndOffset,
+            "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
+              format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
+          error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
+            .format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
+      }
 
-    val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
-    if(segmentsToBeDeleted.size < segments.view.size) {
+      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
+      if(segmentsToBeDeleted.size < segments.view.size) {
       val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
       if(numSegmentsDeleted != segmentsToBeDeleted.size)
         error("Failed to delete some segments during log recovery")
-    }
+      }
   }
 
   def topicName():String = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Jul 31 22:50:59 2012
@@ -1,11 +1,11 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ 	* Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -35,7 +35,7 @@ private[kafka] class LogManager(val conf
                                 val logCleanupIntervalMs: Long,
                                 val logCleanupDefaultAgeMs: Long,
                                 needRecovery: Boolean) extends Logging {
-
+  
   val logDir: File = new File(config.logDir)
   private val numPartitions = config.numPartitions
   private val maxSize: Long = config.logFileSize
@@ -44,7 +44,6 @@ private[kafka] class LogManager(val conf
   private val logFlushIntervals = config.flushIntervalMap
   private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
   private val logRetentionSize = config.logRetentionSize
-  this.logIdent = "Log Manager on Broker " + config.brokerId + ", "
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -61,11 +60,11 @@ private[kafka] class LogManager(val conf
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, flushInterval, needRecovery, config.brokerId)
-        val topicPartition = Utils.getTopicPartition(dir.getName)
-        logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
-        val parts = logs.get(topicPartition._1)
-        parts.put(topicPartition._2, log)
+        val log = new Log(dir, maxSize, flushInterval, needRecovery)
+        val topicPartion = Utils.getTopicPartition(dir.getName)
+        logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
+        val parts = logs.get(topicPartion._1)
+        parts.put(topicPartion._2, log)
       }
     }
   }
@@ -79,9 +78,9 @@ private[kafka] class LogManager(val conf
       info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
       scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
       info("Starting log flusher every " + config.flushSchedulerThreadRate +
-                   " ms with the following overrides " + logFlushIntervals)
+        " ms with the following overrides " + logFlushIntervals)
       scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-",
-                                 config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
+        config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
     }
   }
 
@@ -94,14 +93,14 @@ private[kafka] class LogManager(val conf
       throw new InvalidTopicException("Topic name can't be emtpy")
     if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
       val error = "Wrong partition %d, valid partitions (0, %d)."
-              .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
+        .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
       warn(error)
       throw new InvalidPartitionException(error)
     }
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, flushInterval, false, config.brokerId)
+      new Log(d, maxSize, flushInterval, false)
     }
   }
 
@@ -196,19 +195,18 @@ private[kafka] class LogManager(val conf
       debug("Garbage collecting '" + log.name + "'")
       total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
     }
-    debug("Log cleanup completed. " + total + " files deleted in " +
-                  (time.milliseconds - startMs) / 1000 + " seconds")
+    debug("Log cleanup completed. " + total + " files deleted in " + 
+                 (time.milliseconds - startMs) / 1000 + " seconds")
   }
-
+  
   /**
    * Close all the logs
    */
   def shutdown() {
-    info("shut down")
+    info("Closing log manager")
     allLogs.foreach(_.close())
-    info("shutted down completedly")
   }
-
+  
   /**
    * Get all the partition logs
    */
@@ -224,7 +222,7 @@ private[kafka] class LogManager(val conf
         if(logFlushIntervals.contains(log.topicName))
           logFlushInterval = logFlushIntervals(log.topicName)
         debug(log.topicName + " flush interval  " + logFlushInterval +
-                      " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
+            " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Tue Jul 31 22:50:59 2012
@@ -31,13 +31,12 @@ import kafka.utils._
  *   N Processor threads that each have their own selector and read requests from sockets
  *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
  */
-class SocketServer(val brokerId: Int,
-                   val port: Int,
+class SocketServer(val port: Int,
                    val numProcessorThreads: Int, 
                    val monitoringPeriodSecs: Int,
                    val maxQueuedRequests: Int,
                    val maxRequestSize: Int = Int.MaxValue) extends Logging {
-  this.logIdent = "Socket Server on Broker " + brokerId + ", "
+
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
   private var acceptor: Acceptor = new Acceptor(port, processors)
@@ -58,18 +57,18 @@ class SocketServer(val brokerId: Int,
     // start accepting connections
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
-    info("started")
+    info("Kafka socket server started")
   }
 
   /**
    * Shutdown the socket server
    */
   def shutdown() = {
-    info("shutting down")
+    info("Shutting down socket server")
     acceptor.shutdown
     for(processor <- processors)
       processor.shutdown
-    info("shutted down completely")
+    info("Shut down socket server complete")
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Tue Jul 31 22:50:59 2012
@@ -23,7 +23,7 @@ import kafka.cluster.Broker
 
 abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
-  private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread]
+  private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = name + " "
 
@@ -52,7 +52,7 @@ abstract class AbstractFetcherManager(pr
   }
 
   def removeFetcher(topic: String, partitionId: Int) {
-    info("removing fetcher on topic %s, partition %d".format(topic, partitionId))
+    info("%s removing fetcher on topic %s, partition %d".format(name, topic, partitionId))
     mapLock synchronized {
       for ((key, fetcher) <- fetcherThreadMap) {
         fetcher.removePartition(topic, partitionId)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Jul 31 22:50:59 2012
@@ -25,29 +25,24 @@ import kafka.common._
 import kafka.log._
 import kafka.message._
 import kafka.network._
+import kafka.utils.{SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
 import scala.math._
 import kafka.network.RequestChannel.Response
-import kafka.utils.{ZkUtils, SystemTime, Logging}
-import kafka.cluster.Replica
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
-                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper,
-                addReplicaCbk: (String, Int, Set[Int]) => Replica,
-                stopReplicaCbk: (String, Int) => Short,
-                becomeLeader: (Replica, LeaderAndISR) => Short,
-                becomeFollower: (Replica, LeaderAndISR) => Short,
-                brokerId: Int) extends Logging {
+class KafkaApis(val requestChannel: RequestChannel,
+                val logManager: LogManager,
+                val replicaManager: ReplicaManager,
+                val kafkaZookeeper: KafkaZooKeeper) extends Logging {
 
-  private val produceRequestPurgatory = new ProducerRequestPurgatory(brokerId)
-  private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
+  private val produceRequestPurgatory = new ProducerRequestPurgatory
+  private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
-  this.logIdent = "KafkaApi on Broker " + brokerId + ", "
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -67,46 +62,12 @@ class KafkaApis(val requestChannel: Requ
 
 
   def handleLeaderAndISRRequest(request: RequestChannel.Request){
-    val responseMap = new HashMap[(String, Int), Short]
     val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
-    info("handling leader and isr request " + leaderAndISRRequest)
-
-    for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
-      var errorCode = ErrorMapping.NoError
-      val topic = partitionInfo._1
-      val partition = partitionInfo._2
-
-      // If the partition does not exist locally, create it
-      if(replicaManager.getPartition(topic, partition) == None){
-        trace("the partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition))
-        val assignedReplicas = ZkUtils.getReplicasForPartition(kafkaZookeeper.getZookeeperClient, topic, partition)
-        trace("assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString))
-        if(assignedReplicas.contains(brokerId)) {
-          val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
-          info("starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
-        }
-      }
-      val replica = replicaManager.getReplica(topic, partition).get
-      // The command ask this broker to be new leader for P and it isn't the leader yet
-      val requestedLeaderId = leaderAndISR.leader
-      // If the broker is requested to be the leader and it's not current the leader (the leader id is set and not equal to broker id)
-      if(requestedLeaderId == brokerId && (!replica.partition.leaderId().isDefined || replica.partition.leaderId().get != brokerId)){
-        info("becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
-        errorCode = becomeLeader(replica, leaderAndISR)
-      }
-      else if (requestedLeaderId != brokerId) {
-        info("becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
-        errorCode = becomeFollower(replica, leaderAndISR)
-      }
-
-      responseMap.put(partitionInfo, errorCode)
-    }
+    val responseMap = new HashMap[(String, Int), Short]
 
-    if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){
-      replicaManager.startHighWaterMarksCheckPointThread
-      val partitionsToRemove = replicaManager.allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).keySet
-      info("init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-      partitionsToRemove.foreach(p => stopReplicaCbk(p._1, p._2))
+    // TODO: put in actual logic later
+    for((key, value) <- leaderAndISRRequest.leaderAndISRInfos){
+      responseMap.put(key, ErrorMapping.NoError)
     }
 
     val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
@@ -118,9 +79,9 @@ class KafkaApis(val requestChannel: Requ
     val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
     val responseMap = new HashMap[(String, Int), Short]
 
+    // TODO: put in actual logic later
     for((topic, partition) <- stopReplicaRequest.stopReplicaSet){
-      val errorCode = stopReplicaCbk(topic, partition)
-      responseMap.put((topic, partition), errorCode)
+      responseMap.put((topic, partition), ErrorMapping.NoError)
     }
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
@@ -134,7 +95,7 @@ class KafkaApis(val requestChannel: Requ
     var satisfied = new mutable.ArrayBuffer[DelayedFetch]
     for(partitionData <- partitionDatas)
       satisfied ++= fetchRequestPurgatory.update((topic, partitionData.partition), partitionData)
-    trace("produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
+    trace("Produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
       val topicData = readMessageSets(fetchReq.fetch)
@@ -150,11 +111,11 @@ class KafkaApis(val requestChannel: Requ
     val produceRequest = ProducerRequest.readFrom(request.request.buffer)
     val sTime = SystemTime.milliseconds
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("producer request %s".format(produceRequest.toString))
+      requestLogger.trace("Producer request " + request.toString)
     trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
 
     val response = produceToLocalLog(produceRequest)
-    debug("produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
     if (produceRequest.requiredAcks == 0 ||
         produceRequest.requiredAcks == 1 ||
@@ -172,11 +133,13 @@ class KafkaApis(val requestChannel: Requ
           (topic, partitionData.partition)
         })
       })
+
       val delayedProduce = new DelayedProduce(
         topicPartitionPairs, request,
         response.errors, response.offsets,
         produceRequest, produceRequest.ackTimeoutMs.toLong)
       produceRequestPurgatory.watch(delayedProduce)
+
       /*
        * Replica fetch requests may have arrived (and potentially satisfied)
        * delayedProduce requests before they even made it to the purgatory.
@@ -184,9 +147,10 @@ class KafkaApis(val requestChannel: Requ
        */
       var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
       topicPartitionPairs.foreach(topicPartition =>
-                                    satisfiedProduceRequests ++=
-                                            produceRequestPurgatory.update(topicPartition, topicPartition))
-      debug("%d DelayedProduce requests unblocked after produce to local log.".format(satisfiedProduceRequests.size))
+        satisfiedProduceRequests ++=
+          produceRequestPurgatory.update(topicPartition, topicPartition))
+      debug(satisfiedProduceRequests.size +
+        " DelayedProduce requests unblocked after produce to local log.")
       satisfiedProduceRequests.foreach(_.respond())
     }
   }
@@ -195,10 +159,10 @@ class KafkaApis(val requestChannel: Requ
    * Helper method for handling a parsed producer request
    */
   private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
-    trace("produce [%s] to local log ".format(request.toString))
     val requestSize = request.topicPartitionCount
     val errors = new Array[Short](requestSize)
     val offsets = new Array[Long](requestSize)
+
     var msgIndex = -1
     for(topicData <- request.data) {
       for(partitionData <- topicData.partitionDataArray) {
@@ -217,7 +181,7 @@ class KafkaApis(val requestChannel: Requ
           case e =>
             BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
             BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
-            error("error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
+            error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
             e match {
               case _: IOException =>
                 fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
@@ -229,8 +193,7 @@ class KafkaApis(val requestChannel: Requ
         }
       }
     }
-    val ret = new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
-    ret
+    new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
   }
 
   /**
@@ -238,7 +201,9 @@ class KafkaApis(val requestChannel: Requ
    */
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = FetchRequest.readFrom(request.request.buffer)
-    trace("handling fetch request: " + fetchRequest.toString)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Fetch request " + fetchRequest.toString)
+    trace("Broker %s received fetch request %s".format(logManager.config.brokerId, fetchRequest.toString))
     // validate the request
     try {
       fetchRequest.validate()
@@ -260,7 +225,8 @@ class KafkaApis(val requestChannel: Requ
           )
         })
       })
-      debug("replica %d fetch unblocked %d DelayedProduce requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size))
+      trace("Replica %d fetch unblocked %d DelayedProduce requests.".format(
+        fetchRequest.replicaId, satisfiedProduceRequests.size))
       satisfiedProduceRequests.foreach(_.respond())
     }
 
@@ -270,11 +236,11 @@ class KafkaApis(val requestChannel: Requ
        availableBytes >= fetchRequest.minBytes ||
        fetchRequest.numPartitions <= 0) {
       val topicData = readMessageSets(fetchRequest)
-      debug("returning fetch response %s for fetch request with correlation id %d".format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
+      debug("Returning fetch response %s for fetch request with correlation id %d"
+        .format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
-      debug("putting fetch request into purgatory")
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val topicPartitionPairs: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
       val delayedFetch = new DelayedFetch(topicPartitionPairs, request, fetchRequest, fetchRequest.maxWait, availableBytes)
@@ -290,6 +256,7 @@ class KafkaApis(val requestChannel: Requ
     for(offsetDetail <- fetchRequest.offsetInfo) {
       for(i <- 0 until offsetDetail.partitions.size) {
         try {
+          debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
           val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
           val available = maybeLog match {
             case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i))
@@ -298,7 +265,7 @@ class KafkaApis(val requestChannel: Requ
           totalBytes += math.min(offsetDetail.fetchSizes(i), available)
         } catch {
           case e: InvalidPartitionException =>
-            info("invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
+            info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
         }
       }
     }
@@ -307,13 +274,13 @@ class KafkaApis(val requestChannel: Requ
 
   private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
     val offsets = fetchRequest.offsetInfo
-    debug("act on update partition HW, check offset detail: %s ".format(offsets))
+
     for(offsetDetail <- offsets) {
       val topic = offsetDetail.topic
       val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
       for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
         replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset,
-                                              kafkaZookeeper.getZookeeperClient)
+          kafkaZookeeper.getZookeeperClient)
       }
     }
   }
@@ -343,17 +310,21 @@ class KafkaApis(val requestChannel: Requ
             BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
             BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
             val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
-            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(logManager.config.brokerId))
+            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) +
+              " must exist on leader broker %d".format(logManager.config.brokerId))
             val leaderReplica = leaderReplicaOpt.get
             fetchRequest.replicaId match {
               case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
               case _ => // fetch request from a follower
                 val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
-                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, replicaManager.config.brokerId))
+                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d"
+                  .format(fetchRequest.replicaId, replicaManager.config.brokerId))
                 val replica = replicaOpt.get
-                debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
-                debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+                  .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
+                  .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
             }
         }
@@ -372,7 +343,7 @@ class KafkaApis(val requestChannel: Requ
     try {
       // check if the current broker is the leader for the partitions
       kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
-      trace("fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+      trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
       val log = logManager.getLog(topic, partition)
       response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
     } catch {
@@ -389,7 +360,7 @@ class KafkaApis(val requestChannel: Requ
   def handleOffsetRequest(request: RequestChannel.Request) {
     val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("offset request " + offsetRequest.toString)
+      requestLogger.trace("Offset request " + offsetRequest.toString)
     var response: OffsetResponse = null
     try {
       kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition)
@@ -401,7 +372,8 @@ class KafkaApis(val requestChannel: Requ
         System.exit(1)
       case e =>
         warn("Error while responding to offset request", e)
-        response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
+        response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],
+          ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
     }
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
@@ -412,39 +384,41 @@ class KafkaApis(val requestChannel: Requ
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("topic metadata request " + metadataRequest.toString())
+      requestLogger.trace("Topic metadata request " + metadataRequest.toString())
+
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val zkClient = kafkaZookeeper.getZookeeperClient
     var errorCode = ErrorMapping.NoError
     val config = logManager.config
+
     try {
       val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
-      metadataRequest.topics.zip(topicMetadataList).foreach(
-        topicAndMetadata =>{
-          val topic = topicAndMetadata._1
-          topicAndMetadata._2.errorCode match {
-            case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
-            case ErrorMapping.UnknownTopicCode =>
-              /* check if auto creation of topics is turned on */
-              if(config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-                info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                             .format(topic, config.numPartitions, config.defaultReplicationFactor))
-                val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-                newTopicMetadata.errorCode match {
-                  case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
-                  case _ =>
-                    throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
-                }
+
+      metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
+        val topic = topicAndMetadata._1
+        topicAndMetadata._2.errorCode match {
+          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
+          case ErrorMapping.UnknownTopicCode =>
+            /* check if auto creation of topics is turned on */
+            if(config.autoCreateTopics) {
+              CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                .format(topic, config.numPartitions, config.defaultReplicationFactor))
+              val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+              newTopicMetadata.errorCode match {
+                case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
+                case _ =>
+                  throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
               }
-            case _ => error("Error while fetching topic metadata for topic " + topic,
-                            ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
-          }
-        })
+            }
+          case _ => error("Error while fetching topic metadata for topic " + topic,
+            ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
+        }
+      }
     }catch {
       case e => error("Error while retrieving topic metadata", e)
-      // convert exception type to error code
-      errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        // convert exception type to error code
+        errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
     }
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
     val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
@@ -452,10 +426,7 @@ class KafkaApis(val requestChannel: Requ
   }
 
   def close() {
-    debug("shut down")
     fetchRequestPurgatory.shutdown()
-    produceRequestPurgatory.shutdown()
-    debug("shutted down completely")
   }
 
   /**
@@ -468,7 +439,7 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData]("Fetch Request Purgatory on Broker " + brokerId + ", ") {
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData] {
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@@ -476,7 +447,6 @@ class KafkaApis(val requestChannel: Requ
     def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = {
       val messageDataSize = partitionData.messages.sizeInBytes
       val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
-      debug("fetch request check, accm size: " + accumulatedSize + " delay fetch min bytes: " + delayedFetch.fetch.minBytes)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
 
@@ -496,7 +466,7 @@ class KafkaApis(val requestChannel: Requ
                        requiredOffsets: Array[Long],
                        val produce: ProducerRequest,
                        delayMs: Long)
-          extends DelayedRequest(keys, request, delayMs) with Logging {
+    extends DelayedRequest(keys, request, delayMs) with Logging {
 
     /**
      * Map of (topic, partition) -> partition status
@@ -525,15 +495,15 @@ class KafkaApis(val requestChannel: Requ
 
     def respond() {
       val errorsAndOffsets: (List[Short], List[Long]) = (
-              keys.foldRight
-                      ((List[Short](), List[Long]()))
-                      ((key: Any, result: (List[Short], List[Long])) => {
-                        val status = partitionStatus(key)
-                        (status.error :: result._1, status.requiredOffset :: result._2)
-                      })
-              )
+        keys.foldRight
+          ((List[Short](), List[Long]()))
+          ((key: Any, result: (List[Short], List[Long])) => {
+            val status = partitionStatus(key)
+            (status.error :: result._1, status.requiredOffset :: result._2)
+          })
+        )
       val response = new ProducerResponse(produce.versionId, produce.correlationId,
-                                          errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
+        errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
 
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))
@@ -569,7 +539,7 @@ class KafkaApis(val requestChannel: Requ
                 numAcks, produce.requiredAcks,
                 topic, partitionId))
               if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
-                      (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
+                (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
                 /*
                  * requiredAcks < 0 means acknowledge after all replicas in ISR
                  * are fully caught up to the (local) leader's offset
@@ -618,7 +588,8 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for produce requests waiting to be satisfied.
    */
-  private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, (String, Int)]("Producer Request Purgatory on Broker " + brokerId + ", ") {
+  private [kafka] class ProducerRequestPurgatory
+    extends RequestPurgatory[DelayedProduce, (String, Int)] {
 
     protected def checkSatisfied(fetchRequestPartition: (String, Int),
                                  delayedProduce: DelayedProduce) =