You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/09/17 22:16:00 UTC

svn commit: r1386806 [1/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/...

Author: jjkoshy
Date: Mon Sep 17 20:15:59 2012
New Revision: 1386806

URL: http://svn.apache.org/viewvc?rev=1386806&view=rev
Log:
KAFKA-391 Refactor fetch/producer requests to use maps instead of several arrays; patched by Joel Koshy; reviewed by Jun Rao.

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FetchRequestFormatException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/
Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Mon Sep 17 20:15:59 2012
@@ -26,10 +26,10 @@ import java.util.Random;
 import kafka.etl.KafkaETLKey;
 import kafka.etl.KafkaETLRequest;
 import kafka.etl.Props;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.javaapi.producer.SyncProducer;
+import kafka.javaapi.producer.Producer;
+import kafka.javaapi.producer.ProducerData;
 import kafka.message.Message;
-import kafka.producer.SyncProducerConfig;
+import kafka.producer.ProducerConfig;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -47,7 +47,7 @@ public class DataGenerator {
 			System.currentTimeMillis());
 
 	protected Props _props;
-	protected SyncProducer _producer = null;
+	protected Producer _producer = null;
 	protected URI _uri = null;
 	protected String _topic;
 	protected int _count;
@@ -70,12 +70,12 @@ public class DataGenerator {
 		
 		System.out.println("server uri:" + _uri.toString());
         Properties producerProps = new Properties();
-        producerProps.put("host", _uri.getHost());
-        producerProps.put("port", String.valueOf(_uri.getPort()));
+        producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
         producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
         producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
         producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
-		_producer = new SyncProducer(new SyncProducerConfig(producerProps));
+        
+		_producer = new Producer(new ProducerConfig(producerProps));
 			
 	}
 
@@ -91,7 +91,8 @@ public class DataGenerator {
 		}
 		// send events
 		System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
-		_producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list));
+        ProducerData<Integer, Message> pd = new ProducerData<Integer, Message>(_topic, null, list);
+		_producer.send(pd);
 
 		// close the producer
 		_producer.close();

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Mon Sep 17 20:15:59 2012
@@ -18,60 +18,13 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.utils.Utils
-import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
-import kafka.common.{KafkaException, FetchRequestFormatException}
-
-object OffsetDetail {
-
-  def readFrom(buffer: ByteBuffer): OffsetDetail = {
-    val topic = Utils.readShortString(buffer, "UTF-8")
-
-    val partitionsCount = buffer.getInt
-    val partitions = new Array[Int](partitionsCount)
-    for (i <- 0 until partitions.length)
-      partitions(i) = buffer.getInt
-
-    val offsetsCount = buffer.getInt
-    val offsets = new Array[Long](offsetsCount)
-    for (i <- 0 until offsets.length)
-      offsets(i) = buffer.getLong
-
-    val fetchesCount = buffer.getInt
-    val fetchSizes = new Array[Int](fetchesCount)
-    for (i <- 0 until fetchSizes.length)
-      fetchSizes(i) = buffer.getInt
+import kafka.utils.{nonthreadsafe, Utils}
+import scala.collection.immutable.Map
+import kafka.common.TopicAndPartition
 
-    new OffsetDetail(topic, partitions, offsets, fetchSizes)
-  }
-
-}
-
-case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) {
-
-  def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic, "UTF-8")
-
-    if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue)
-      throw new KafkaException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".")
-
-    buffer.putInt(partitions.length)
-    partitions.foreach(buffer.putInt(_))
 
-    buffer.putInt(offsets.length)
-    offsets.foreach(buffer.putLong(_))
-
-    buffer.putInt(fetchSizes.length)
-    fetchSizes.foreach(buffer.putInt(_))
-  }
+case class PartitionFetchInfo(offset: Long, fetchSize: Int)
 
-  def sizeInBytes(): Int = {
-    2 + topic.length() +                              // topic string
-      partitions.foldLeft(4)((s, _) => s + 4) +       // each request partition (int)
-      offsets.foldLeft(4)((s, _) => s + 8) +          // each request offset (long)
-      fetchSizes.foldLeft(4)((s,_) => s + 4)          // each request fetch size
-  }
-}
 
 object FetchRequest {
   val CurrentVersion = 1.shortValue()
@@ -85,18 +38,23 @@ object FetchRequest {
   def readFrom(buffer: ByteBuffer): FetchRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = Utils.readShortString(buffer, "UTF-8")
+    val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
     val replicaId = buffer.getInt
     val maxWait = buffer.getInt
     val minBytes = buffer.getInt
-    val offsetsCount = buffer.getInt
-    val offsetInfo = new Array[OffsetDetail](offsetsCount)
-    for(i <- 0 until offsetInfo.length)
-      offsetInfo(i) = OffsetDetail.readFrom(buffer)
-
-    new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo)
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partitionId = buffer.getInt
+        val offset = buffer.getLong
+        val fetchSize = buffer.getInt
+        (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, fetchSize))
+      })
+    })
+    FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*))
   }
-
 }
 
 case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
@@ -105,50 +63,63 @@ case class FetchRequest(versionId: Short
                         replicaId: Int = FetchRequest.DefaultReplicaId,
                         maxWait: Int = FetchRequest.DefaultMaxWait,
                         minBytes: Int = FetchRequest.DefaultMinBytes,
-                        offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
+                        requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
+        extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
 
-  // ensure that a topic "X" appears in at most one OffsetDetail
-  def validate() {
-    if(offsetInfo == null)
-      throw new FetchRequestFormatException("FetchRequest has null offsetInfo")
-
-    // We don't want to get fancy with groupBy's and filter's since we just want the first occurrence
-    var topics = Set[String]()
-    val iter = offsetInfo.iterator
-    while(iter.hasNext) {
-      val offsetData = iter.next()
-      val topic = offsetData.topic
-      if(topics.contains(topic))
-        throw new FetchRequestFormatException("FetchRequest has multiple OffsetDetails for topic: " + topic)
-      else
-        topics += topic
-    }
-  }
+  /**
+   * Partitions the request info into a map of maps (one for each topic).
+   */
+  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
   def writeTo(buffer: ByteBuffer) {
-    // validate first
-    validate()
-
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
-    Utils.writeShortString(buffer, clientId, "UTF-8")
+    Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
     buffer.putInt(replicaId)
     buffer.putInt(maxWait)
     buffer.putInt(minBytes)
-    buffer.putInt(offsetInfo.size)
-    for(topicDetail <- offsetInfo) {
-      topicDetail.writeTo(buffer)
+    buffer.putInt(requestInfoGroupedByTopic.size) // topic count
+    requestInfoGroupedByTopic.foreach {
+      case (topic, partitionFetchInfos) =>
+        Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
+        buffer.putInt(partitionFetchInfos.size) // partition count
+        partitionFetchInfos.foreach {
+          case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) =>
+            buffer.putInt(partition)
+            buffer.putLong(offset)
+            buffer.putInt(fetchSize)
+        }
     }
   }
 
-  def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
+  def sizeInBytes: Int = {
+    2 + /* versionId */
+    4 + /* correlationId */
+    Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
+    4 + /* replicaId */
+    4 + /* maxWait */
+    4 + /* minBytes */
+    4 + /* topic count */
+    requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+      val (topic, partitionFetchInfos) = currTopic
+      foldedTopics +
+      Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+      4 + /* partition count */
+      partitionFetchInfos.size * (
+        4 + /* partition id */
+        8 + /* offset */
+        4 /* fetch size */
+      )
+    })
+  }
 
-  def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size)
+  def isFromFollower = replicaId != FetchRequest.NonFollowerId
 
-  def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId
+  def numPartitions = requestInfo.size
 }
 
 
+@nonthreadsafe
 class FetchRequestBuilder() {
   private var correlationId = FetchRequest.DefaultCorrelationId
   private val versionId = FetchRequest.CurrentVersion
@@ -156,13 +127,10 @@ class FetchRequestBuilder() {
   private var replicaId = FetchRequest.DefaultReplicaId
   private var maxWait = FetchRequest.DefaultMaxWait
   private var minBytes = FetchRequest.DefaultMinBytes
-  private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]
+  private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]
 
   def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
-    val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]()))
-    topicData._1.append(partition)
-    topicData._2.append(offset)
-    topicData._3.append(fetchSize)
+    requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))
     this
   }
 
@@ -191,10 +159,5 @@ class FetchRequestBuilder() {
     this
   }
 
-  def build() = {
-    val offsetDetails = requestMap.map{ topicData =>
-      new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray)
-    }
-    new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail])
-  }
+  def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Mon Sep 17 20:15:59 2012
@@ -19,27 +19,35 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import java.nio.channels.GatheringByteChannel
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.network.{MultiSend, Send}
 import kafka.utils.Utils
 
 object PartitionData {
   def readFrom(buffer: ByteBuffer): PartitionData = {
-    val error = buffer.getShort
     val partition = buffer.getInt
+    val error = buffer.getShort
     val initialOffset = buffer.getLong
-    val hw = buffer.getLong()
+    val hw = buffer.getLong
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
     new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset))
   }
+
+  val headerSize =
+    4 + /* partition */
+    2 + /* error code */
+    8 + /* initialOffset */
+    8 + /* high watermark */
+    4 /* messageSetSize */
 }
 
 case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
-  val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8
+
+  val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes.intValue()
 
   def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
 }
@@ -50,17 +58,17 @@ class PartitionDataSend(val partitionDat
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0L
 
-  private val buffer = ByteBuffer.allocate(26)
-  buffer.putShort(partitionData.error)
+  private val buffer = ByteBuffer.allocate(PartitionData.headerSize)
   buffer.putInt(partitionData.partition)
+  buffer.putShort(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
   buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes.intValue())
   buffer.rewind()
 
-  def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
+  override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
 
-  def writeTo(channel: GatheringByteChannel): Int = {
+  override def writeTo(channel: GatheringByteChannel): Int = {
     var written = 0
     if(buffer.hasRemaining)
       written += channel.write(buffer)
@@ -75,63 +83,43 @@ class PartitionDataSend(val partitionDat
 
 object TopicData {
   def readFrom(buffer: ByteBuffer): TopicData = {
-    val topic = Utils.readShortString(buffer, "UTF-8")
+    val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
     val partitionCount = buffer.getInt
-    val partitions = new Array[PartitionData](partitionCount)
-    for(i <- 0 until partitionCount)
-      partitions(i) = PartitionData.readFrom(buffer)
-    new TopicData(topic, partitions.sortBy(_.partition))
-  }
-
-  def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = {
-    if(data == null || data.size == 0)
-      return None
-
-    var (low, high) = (0, data.size-1)
-    while(low <= high) {
-      val mid = (low + high) / 2
-      val found = data(mid)
-      if(found.partition == partition)
-        return Some(found)
-      else if(partition < found.partition)
-        high = mid - 1
-      else
-        low = mid + 1
-    }
-    None
+    val topicPartitionDataPairs = (1 to partitionCount).map(_ => {
+      val partitionData = PartitionData.readFrom(buffer)
+      (TopicAndPartition(topic, partitionData.partition), partitionData)
+    })
+    TopicData(topic, Map(topicPartitionDataPairs:_*))
   }
+
+  def headerSize(topic: String) =
+    Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+    4 /* partition count */
 }
 
-case class TopicData(topic: String, partitionDataArray: Array[PartitionData]) {
-  val sizeInBytes = 2 + topic.length + partitionDataArray.foldLeft(4)(_ + _.sizeInBytes)
+case class TopicData(topic: String, partitionData: Map[TopicAndPartition, PartitionData]) {
+  val sizeInBytes =
+    TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes)
 
-  // need to override equals due to brokern java-arrays equals functionality
-  override def equals(other: Any): Boolean = {
-    other match {
-      case that: TopicData =>
-        ( topic == that.topic &&
-          partitionDataArray.toSeq == that.partitionDataArray.toSeq )
-      case _ => false
-    }
-  }
+  val headerSize = TopicData.headerSize(topic)
 }
 
 class TopicDataSend(val topicData: TopicData) extends Send {
-  val size = topicData.sizeInBytes
+  private val size = topicData.sizeInBytes
 
-  var sent = 0
+  private var sent = 0
 
-  private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
-  Utils.writeShortString(buffer, topicData.topic, "UTF-8")
-  buffer.putInt(topicData.partitionDataArray.length)
+  override def complete = sent >= size
+
+  private val buffer = ByteBuffer.allocate(topicData.headerSize)
+  Utils.writeShortString(buffer, topicData.topic, RequestOrResponse.DefaultCharset)
+  buffer.putInt(topicData.partitionData.size)
   buffer.rewind()
 
-  val sends = new MultiSend(topicData.partitionDataArray.map(new PartitionDataSend(_)).toList) {
-    val expectedBytesToWrite = topicData.partitionDataArray.foldLeft(0)(_ + _.sizeInBytes)
+  val sends = new MultiSend(topicData.partitionData.toList.map(d => new PartitionDataSend(d._2))) {
+    val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize
   }
 
-  def complete = sent >= size
-
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()
     var written = 0
@@ -146,68 +134,87 @@ class TopicDataSend(val topicData: Topic
 }
 
 
+object FetchResponse {
 
+  val headerSize =
+    2 + /* versionId */
+    4 + /* correlationId */
+    4 /* topic count */
 
-object FetchResponse {
   def readFrom(buffer: ByteBuffer): FetchResponse = {
     val versionId = buffer.getShort
-    val errorCode = buffer.getShort
     val correlationId = buffer.getInt
-    val dataCount = buffer.getInt
-    val data = new Array[TopicData](dataCount)
-    for(i <- 0 until data.length)
-      data(i) = TopicData.readFrom(buffer)
-    new FetchResponse(versionId, correlationId, data, errorCode)
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topicData = TopicData.readFrom(buffer)
+      topicData.partitionData.values.map(
+        partitionData => (TopicAndPartition(topicData.topic, partitionData.partition), partitionData)
+      )
+    })
+    FetchResponse(versionId, correlationId, Map(pairs:_*))
   }
 }
 
 
 case class FetchResponse(versionId: Short,
                          correlationId: Int,
-                         data: Array[TopicData],
-                         errorCode: Short = ErrorMapping.NoError)  {
+                         data: Map[TopicAndPartition, PartitionData])  {
 
-  val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes)
+  /**
+   * Partitions the data into a map of maps (one for each topic).
+   */
+  lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
 
-  lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
+  val sizeInBytes =
+    FetchResponse.headerSize +
+    dataGroupedByTopic.foldLeft(0) ((folded, curr) => {
+      val topicData = TopicData(curr._1, curr._2)
+      folded +
+      topicData.sizeInBytes
+    })
 
-  def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
-    val messageSet = topicMap.get(topic) match {
-      case Some(topicData) =>
-        TopicData.findPartition(topicData.partitionDataArray, partition).map(_.messages).getOrElse(MessageSet.Empty)
-      case None =>
-        MessageSet.Empty
+  private def partitionDataFor(topic: String, partition: Int): PartitionData = {
+    val topicAndPartition = TopicAndPartition(topic, partition)
+    data.get(topicAndPartition) match {
+      case Some(partitionData) => partitionData
+      case _ =>
+        throw new IllegalArgumentException(
+          "No partition %s in fetch response %s".format(topicAndPartition, this.toString))
     }
-    messageSet.asInstanceOf[ByteBufferMessageSet]
   }
 
-  def highWatermark(topic: String, partition: Int): Long = {
-    topicMap.get(topic) match {
-      case Some(topicData) =>
-        TopicData.findPartition(topicData.partitionDataArray, partition).map(_.hw).getOrElse(-1L)
-      case None => -1L
-    }
-  }
+  def messageSet(topic: String, partition: Int): ByteBufferMessageSet =
+    partitionDataFor(topic, partition).messages.asInstanceOf[ByteBufferMessageSet]
+
+  def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, partition).hw
+
+  def hasError = data.values.exists(_.error != ErrorMapping.NoError)
+
+  def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error
 }
 
 
 class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
   private val size = fetchResponse.sizeInBytes
+
   private var sent = 0
-  
-  private val buffer = ByteBuffer.allocate(16)
+
+  private val sendSize = 4 /* for size */ + size
+
+  override def complete = sent >= sendSize
+
+  private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
   buffer.putInt(size)
   buffer.putShort(fetchResponse.versionId)
-  buffer.putShort(fetchResponse.errorCode)
   buffer.putInt(fetchResponse.correlationId)
-  buffer.putInt(fetchResponse.data.length)
+  buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
   buffer.rewind()
-  
-  val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) {
-    val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes)
-  }
 
-  def complete = sent >= sendSize
+  val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map {
+    case(topic, data) => new TopicDataSend(TopicData(topic, data))
+  }) {
+    val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize
+  }
 
   def writeTo(channel: GatheringByteChannel):Int = {
     expectIncomplete()
@@ -220,6 +227,5 @@ class FetchResponseSend(val fetchRespons
     sent += written
     written
   }
-
-  def sendSize = 4 + fetchResponse.sizeInBytes
 }
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Mon Sep 17 20:15:59 2012
@@ -20,96 +20,105 @@ package kafka.api
 import java.nio._
 import kafka.message._
 import kafka.utils._
+import scala.collection.Map
+import kafka.common.TopicAndPartition
 
 
 object ProducerRequest {
   val CurrentVersion: Short = 0
-  
+
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
     val versionId: Short = buffer.getShort
     val correlationId: Int = buffer.getInt
-    val clientId: String = Utils.readShortString(buffer, "UTF-8")
+    val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
     val requiredAcks: Short = buffer.getShort
     val ackTimeoutMs: Int = buffer.getInt
     //build the topic structure
     val topicCount = buffer.getInt
-    val data = new Array[TopicData](topicCount)
-    for(i <- 0 until topicCount) {
-      val topic = Utils.readShortString(buffer, "UTF-8")
-      		
+    val partitionDataPairs = (1 to topicCount).flatMap(_ => {
+      // process topic
+      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
       val partitionCount = buffer.getInt
-      //build the partition structure within this topic
-      val partitionData = new Array[PartitionData](partitionCount)
-      for (j <- 0 until partitionCount) {
+      (1 to partitionCount).map(_ => {
         val partition = buffer.getInt
         val messageSetSize = buffer.getInt
         val messageSetBuffer = new Array[Byte](messageSetSize)
         buffer.get(messageSetBuffer,0,messageSetSize)
-        partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
-      }
-      data(i) = new TopicData(topic,partitionData)
-    }
-    new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
+        (TopicAndPartition(topic, partition),
+         new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))))
+      })
+    })
+
+    ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*))
   }
 }
 
-case class ProducerRequest( versionId: Short,
+case class ProducerRequest( versionId: Short = ProducerRequest.CurrentVersion,
                             correlationId: Int,
                             clientId: String,
                             requiredAcks: Short,
                             ackTimeoutMs: Int,
-                            data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
+                            data: Map[TopicAndPartition, PartitionData])
+    extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
-  def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) =
+  /**
+   * Partitions the data into a map of maps (one for each topic).
+   */
+  private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
+
+  def this(correlationId: Int,
+           clientId: String,
+           requiredAcks: Short,
+           ackTimeoutMs: Int,
+           data: Map[TopicAndPartition, PartitionData]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
-    Utils.writeShortString(buffer, clientId, "UTF-8")
+    Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
     buffer.putShort(requiredAcks)
     buffer.putInt(ackTimeoutMs)
+
     //save the topic structure
-    buffer.putInt(data.size) //the number of topics
-    for(topicData <- data) {
-      Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic
-      buffer.putInt(topicData.partitionDataArray.size) //the number of partitions
-      for(partitionData <- topicData.partitionDataArray) {
-        buffer.putInt(partitionData.partition)
-        buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
-        buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
-        partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
-      }
+    buffer.putInt(dataGroupedByTopic.size) //the number of topics
+    dataGroupedByTopic.foreach {
+      case (topic, topicAndPartitionData) =>
+        Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic
+        buffer.putInt(topicAndPartitionData.size) //the number of partitions
+        topicAndPartitionData.foreach(partitionAndData => {
+          val partitionData = partitionAndData._2
+          buffer.putInt(partitionData.partition)
+          buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
+          buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
+          partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
+        })
     }
   }
 
-  def sizeInBytes(): Int = {
-    var size = 0 
-    //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
-    size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4
-    for(topicData <- data) {
-	    size += 2 + topicData.topic.length + 4
-      for(partitionData <- topicData.partitionDataArray) {
-        size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int]
+  def sizeInBytes: Int = {
+    2 + /* versionId */
+    4 + /* correlationId */
+    Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */
+    2 + /* requiredAcks */
+    4 + /* ackTimeoutMs */
+    4 + /* number of topics */
+    dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+      foldedTopics +
+      Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
+      4 + /* the number of partitions */
+      {
+        currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => {
+          foldedPartitions +
+          4 + /* partition id */
+          4 + /* byte-length of serialized messages */
+          currPartition._2.messages.sizeInBytes.toInt
+        })
       }
-    }
-    size
-  }
-
-  // need to override case-class equals due to broken java-array equals()
-  override def equals(other: Any): Boolean = {
-   other match {
-      case that: ProducerRequest =>
-        ( correlationId == that.correlationId &&
-          clientId == that.clientId &&
-          requiredAcks == that.requiredAcks &&
-          ackTimeoutMs == that.ackTimeoutMs &&
-          data.toSeq == that.data.toSeq )
-      case _ => false
-    }
+    })
   }
 
-  def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length)
+  def numPartitions = data.size
 
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Mon Sep 17 20:15:59 2012
@@ -18,57 +18,81 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
+import kafka.utils.Utils
+import scala.collection.Map
+import kafka.common.{TopicAndPartition, ErrorMapping}
 
 
 object ProducerResponse {
   def readFrom(buffer: ByteBuffer): ProducerResponse = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
-    val errorsSize = buffer.getInt
-    val errors = new Array[Short](errorsSize)
-    for( i <- 0 until errorsSize) {
-      errors(i) = buffer.getShort
-    }
-    val offsetsSize = buffer.getInt
-    val offsets = new Array[Long](offsetsSize)
-    for( i <- 0 until offsetsSize) {
-      offsets(i) = buffer.getLong
-    }
-    new ProducerResponse(versionId, correlationId, errors, offsets, errorCode)
+    val topicCount = buffer.getInt
+    val statusPairs = (1 to topicCount).flatMap(_ => {
+      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partition = buffer.getInt
+        val error = buffer.getShort
+        val offset = buffer.getLong
+        (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset))
+      })
+    })
+
+    ProducerResponse(versionId, correlationId, Map(statusPairs:_*))
   }
 }
 
-case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short],
-                            offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
-  val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
+case class ProducerResponseStatus(error: Short, nextOffset: Long)
+
+
+case class ProducerResponse(versionId: Short,
+                            correlationId: Int,
+                            status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse {
+
+  /**
+   * Partitions the status map into a map of maps (one for each topic).
+   */
+  private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
+
+  def hasError = status.values.exists(_.error != ErrorMapping.NoError)
+
+  val sizeInBytes = {
+    val groupedStatus = statusGroupedByTopic
+    2 + /* version id */
+    4 + /* correlation id */
+    4 + /* topic count */
+    groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
+      foldedTopics +
+      Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
+      4 + /* partition count for this topic */
+      currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => {
+        foldedPartitions +
+        4 + /* partition id */
+        2 + /* error code */
+        8 /* offset */
+      })
+    })
+  }
 
   def writeTo(buffer: ByteBuffer) {
-    /* version id */
+    val groupedStatus = statusGroupedByTopic
+
     buffer.putShort(versionId)
-    /* correlation id */
     buffer.putInt(correlationId)
-    /* error code */
-    buffer.putShort(errorCode)
-    /* errors */
-    buffer.putInt(errors.length)
-    errors.foreach(buffer.putShort(_))
-    /* offsets */
-    buffer.putInt(offsets.length)
-    offsets.foreach(buffer.putLong(_))
-  }
+    buffer.putInt(groupedStatus.size) // topic count
 
-  // need to override case-class equals due to broken java-array equals()
-  override def equals(other: Any): Boolean = {
-   other match {
-      case that: ProducerResponse =>
-        ( correlationId == that.correlationId &&
-          versionId == that.versionId &&
-          errorCode == that.errorCode &&
-          errors.toSeq == that.errors.toSeq &&
-          offsets.toSeq == that.offsets.toSeq)
-      case _ => false
-    }
+    groupedStatus.foreach(topicStatus => {
+      val (topic, errorsAndOffsets) = topicStatus
+      Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
+      buffer.putInt(errorsAndOffsets.size) // partition count
+      errorsAndOffsets.foreach {
+        case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) =>
+          buffer.putInt(partition)
+          buffer.putShort(error)
+          buffer.putLong(nextOffset)
+      }
+    })
   }
-}
\ No newline at end of file
+}
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala Mon Sep 17 20:15:59 2012
@@ -19,6 +19,12 @@ package kafka.api
 
 import java.nio._
 
+
+object RequestOrResponse {
+  val DefaultCharset = "UTF-8"
+}
+
+
 private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
 
   def sizeInBytes: Int

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Mon Sep 17 20:15:59 2012
@@ -34,13 +34,12 @@ object ErrorMapping {
   val InvalidMessageCode : Short = 2
   val UnknownTopicOrPartitionCode : Short = 3
   val InvalidFetchSizeCode  : Short = 4
-  val InvalidFetchRequestFormatCode : Short = 5
-  val LeaderNotAvailableCode : Short = 6
-  val NotLeaderForPartitionCode : Short = 7
-  val RequestTimedOutCode: Short = 8
-  val BrokerNotAvailableCode: Short = 9
-  val ReplicaNotAvailableCode: Short = 10
-  val MessageSizeTooLargeCode: Short = 11
+  val LeaderNotAvailableCode : Short = 5
+  val NotLeaderForPartitionCode : Short = 6
+  val RequestTimedOutCode: Short = 7
+  val BrokerNotAvailableCode: Short = 8
+  val ReplicaNotAvailableCode: Short = 9
+  val MessageSizeTooLargeCode: Short = 10
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -48,7 +47,6 @@ object ErrorMapping {
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
       classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
-      classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala?rev=1386806&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala Mon Sep 17 20:15:59 2012
@@ -0,0 +1,29 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (topic, partition) pairs are ubiquitous.
+ */
+case class TopicAndPartition(topic: String, partition: Int) {
+
+  def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
+
+  def asTuple = (topic, partition)
+}
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Mon Sep 17 20:15:59 2012
@@ -25,6 +25,7 @@ import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
+import kafka.common.TopicAndPartition
 
 
 /**
@@ -38,7 +39,7 @@ class ConsumerFetcherManager(private val
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) {
   private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null
   private var cluster: Cluster = null
-  private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)]
+  private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
   private val lock = new ReentrantLock
   private val cond = lock.newCondition()
   private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){
@@ -48,21 +49,22 @@ class ConsumerFetcherManager(private val
       try {
         if (noLeaderPartitionSet.isEmpty)
           cond.await()
-        for ((topic, partitionId) <- noLeaderPartitionSet) {
-          // find the leader for this partition
-          getLeaderForPartition(zkClient, topic, partitionId) match {
-            case Some(leaderId) =>
-              cluster.getBroker(leaderId) match {
-                case Some(broker) =>
-                  val pti = partitionMap((topic, partitionId))
-                  addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
-                  noLeaderPartitionSet.remove((topic, partitionId))
-                case None =>
-                  error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started"
-                                .format(leaderId, topic, partitionId))
-              }
-            case None => // let it go since we will keep retrying
-          }
+        noLeaderPartitionSet.foreach {
+          case(TopicAndPartition(topic, partitionId)) =>
+            // find the leader for this partition
+            getLeaderForPartition(zkClient, topic, partitionId) match {
+              case Some(leaderId) =>
+                cluster.getBroker(leaderId) match {
+                  case Some(broker) =>
+                    val pti = partitionMap((topic, partitionId))
+                    addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
+                    noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId))
+                  case None =>
+                    error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started"
+                                  .format(leaderId, topic, partitionId))
+                }
+              case None => // let it go since we will keep retrying
+            }
         }
       } finally {
         lock.unlock()
@@ -84,7 +86,7 @@ class ConsumerFetcherManager(private val
     try {
       partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap
       this.cluster = cluster
-      noLeaderPartitionSet ++= topicInfos.map(tpi => (tpi.topic, tpi.partitionId))
+      noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
       cond.signalAll()
     } finally {
       lock.unlock()
@@ -117,7 +119,7 @@ class ConsumerFetcherManager(private val
     pti      
   }
 
-  def addPartitionsWithError(partitionList: Iterable[(String, Int)]) {
+  def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
     debug("adding partitions with error %s".format(partitionList))
     lock.lock()
     try {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Mon Sep 17 20:15:59 2012
@@ -21,6 +21,8 @@ import kafka.cluster.Broker
 import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{FetchRequest, OffsetRequest, PartitionData}
+import kafka.common.TopicAndPartition
+
 
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
@@ -57,7 +59,7 @@ class ConsumerFetcherThread(name: String
   }
 
   // any logic for partitions whose leader has changed
-  def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) {
+  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
     consumerFetcherManager.addPartitionsWithError(partitions)
   }
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala?rev=1386806&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala Mon Sep 17 20:15:59 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import scala.collection.JavaConversions
+import kafka.api.PartitionFetchInfo
+import java.nio.ByteBuffer
+import kafka.common.TopicAndPartition
+
+
+class FetchRequest(correlationId: Int,
+                   clientId: String,
+                   replicaId: Int,
+                   maxWait: Int,
+                   minBytes: Int,
+                   requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
+
+  val underlying = {
+    val scalaMap = JavaConversions.asMap(requestInfo).toMap
+    kafka.api.FetchRequest(
+      correlationId = correlationId,
+      clientId = clientId,
+      replicaId = replicaId,
+      maxWait = maxWait,
+      minBytes = minBytes,
+      requestInfo = scalaMap
+    )
+  }
+
+  def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
+
+  def sizeInBytes = underlying.sizeInBytes
+
+  override def toString = underlying.toString
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherFetchRequest = other.asInstanceOf[kafka.javaapi.FetchRequest]
+    this.underlying.equals(otherFetchRequest.underlying)
+  }
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchRequest]
+
+  override def hashCode = underlying.hashCode
+
+}
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala Mon Sep 17 20:15:59 2012
@@ -17,17 +17,33 @@
 
 package kafka.javaapi
 
-import kafka.api.TopicData
+import kafka.api.PartitionData
+import kafka.common.TopicAndPartition
 
 
 class FetchResponse( val versionId: Short,
                      val correlationId: Int,
-                     private val data: Array[TopicData] ) {
+                     private val data: Map[TopicAndPartition, PartitionData] ) {
 
-  private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data)
+  private val underlying = kafka.api.FetchResponse(versionId, correlationId, data)
 
   def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = {
     import Implicits._
     underlying.messageSet(topic, partition)
   }
+
+  def highWatermark(topic: String, partition: Int) = underlying.highWatermark(topic, partition)
+
+  def hasError = underlying.hasError
+
+  def errorCode(topic: String, partition: Int) = underlying.errorCode(topic, partition)
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse]
+    this.underlying.equals(otherFetchResponse.underlying)
+  }
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchResponse]
+
+  override def hashCode = underlying.hashCode
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Mon Sep 17 20:15:59 2012
@@ -17,9 +17,8 @@
 
 package kafka.javaapi.consumer
 
-import kafka.api.FetchRequest
-import kafka.javaapi.FetchResponse
 import kafka.utils.threadsafe
+import kafka.javaapi.FetchResponse
 
 /**
  * A consumer of kafka messages
@@ -32,15 +31,28 @@ class SimpleConsumer(val host: String,
   val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
 
   /**
-   *  Fetch a set of messages from a topic.
+   *  Fetch a set of messages from a topic. This version of the fetch method
+   *  takes the Scala version of a fetch request (i.e.,
+   *  [[kafka.api.FetchRequest]] and is intended for use with the
+   *  [[kafka.api.FetchRequestBuilder]].
    *
    *  @param request  specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
    *  @return a set of fetched messages
    */
-  def fetch(request: FetchRequest): FetchResponse = {
+  def fetch(request: kafka.api.FetchRequest): FetchResponse = {
     import kafka.javaapi.Implicits._
     underlying.fetch(request)
   }
+  
+  /**
+   *  Fetch a set of messages from a topic.
+   *
+   *  @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+   *  @return a set of fetched messages
+   */
+  def fetch(request: kafka.javaapi.FetchRequest): FetchResponse = {
+    fetch(request.underlying)
+  }
 
   /**
    *  Get a list of valid offsets (up to maxSize) before the given time.

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Mon Sep 17 20:15:59 2012
@@ -23,12 +23,14 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.nio.ByteBuffer
 import kafka.api._
+import kafka.common.TopicAndPartition
+
 
 object RequestChannel {
   val AllDone = new Request(1, 2, getShutdownReceive(), 0)
 
   def getShutdownReceive() = {
-    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]())
+    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, PartitionData]())
     val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
     byteBuffer.putShort(RequestKeys.ProduceKey)
     emptyProducerRequest.writeTo(byteBuffer)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Mon Sep 17 20:15:59 2012
@@ -178,4 +178,4 @@ class SyncProducer(val config: SyncProdu
 object ProducerRequestStat extends KafkaMetricsGroup {
   val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
   val requestSizeHist = newHistogram("ProducerRequestSize")
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Mon Sep 17 20:15:59 2012
@@ -18,13 +18,13 @@
 package kafka.producer.async
 
 import kafka.common._
-import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.producer._
 import kafka.serializer.Encoder
 import kafka.utils.{Utils, Logging}
-import scala.collection.Map
+import scala.collection.{Seq, Map}
 import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.api._
+import kafka.api.{TopicMetadata, ProducerRequest, PartitionData}
 
 
 class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -81,12 +81,13 @@ class DefaultEventHandler[K,V](config: P
             val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
 
             val failedTopicPartitions = send(brokerid, messageSetPerBroker)
-            for( (topic, partition) <- failedTopicPartitions ) {
-              eventsPerBrokerMap.get((topic, partition)) match {
+            failedTopicPartitions.foreach(topicPartition => {
+              eventsPerBrokerMap.get(topicPartition) match {
                 case Some(data) => failedProduceRequests.appendAll(data)
                 case None => // nothing
+
               }
-            }
+            })
           }
         } catch {
           case t: Throwable => error("Failed to send messages", t)
@@ -112,7 +113,7 @@ class DefaultEventHandler[K,V](config: P
             else {
               // currently, if in async mode, we just log the serialization error. We need to revisit
               // this when doing kafka-496
-              error("Error serializing message " + t)
+              error("Error serializing message ", t)
             }
         }
       }
@@ -122,8 +123,8 @@ class DefaultEventHandler[K,V](config: P
     serializedProducerData
   }
 
-  def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = {
-    val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]
+  def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]] = {
+    val ret = new HashMap[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]
     try {
       for (event <- events) {
         val topicPartitionsList = getPartitionListForTopic(event)
@@ -135,16 +136,16 @@ class DefaultEventHandler[K,V](config: P
         // postpone the failure until the send operation, so that requests for other brokers are handled correctly
         val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
 
-        var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
+        var dataPerBroker: HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]] = null
         ret.get(leaderBrokerId) match {
           case Some(element) =>
-            dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
+            dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]]
           case None =>
-            dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
+            dataPerBroker = new HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]
             ret.put(leaderBrokerId, dataPerBroker)
         }
 
-        val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
+        val topicAndPartition = TopicAndPartition(event.getTopic, brokerPartition.partitionId)
         var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
         dataPerBroker.get(topicAndPartition) match {
           case Some(element) =>
@@ -199,39 +200,30 @@ class DefaultEventHandler[K,V](config: P
    * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
    * @return the set (topic, partitions) messages which incurred an error sending or processing
    */
-  private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = {
+  private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = {
     if(brokerId < 0) {
-      warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
+      warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
       messagesPerTopic.keys.toSeq
     } else if(messagesPerTopic.size > 0) {
-      val topics = new HashMap[String, ListBuffer[PartitionData]]()
-      for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
-        val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]())
-        partitionData.append(new PartitionData(partitionId, messagesSet))
+      val topicPartitionDataPairs = messagesPerTopic.toSeq.map {
+        case (topicAndPartition, messages) =>
+          (topicAndPartition, new PartitionData(topicAndPartition.partition, messages))
       }
-      val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
       val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
-        config.requestTimeoutMs, topicData)
+        config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)
-        trace("producer sent messages for topics %s to broker %d on %s:%d"
+        trace("Producer sent messages for topics %s to broker %d on %s:%d"
           .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
-        var msgIdx = -1
-        val errors = new ListBuffer[(String, Int)]
-        for( topic <- topicData; partition <- topic.partitionDataArray ) {
-          msgIdx += 1
-          if (msgIdx >= response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) {
-            errors.append((topic.topic, partition.partition))
-            if (msgIdx < response.errors.size)
-              warn("Received error " + ErrorMapping.exceptionFor(response.errors(msgIdx)) +
-                   "from broker %d on %s:%d".format(brokerId, topic.topic, partition.partition))
-          }
-        }
-        errors
+        if (response.status.size != producerRequest.data.size)
+          throw new KafkaException("Incomplete response (%s) for producer request (%s)"
+                                           .format(response, producerRequest))
+        response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+          .map(partitionStatus => partitionStatus._1)
       } catch {
-        case e =>
-          warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), e)
+        case t: Throwable =>
+          warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), t)
           messagesPerTopic.keys.toSeq
       }
     } else {
@@ -239,7 +231,7 @@ class DefaultEventHandler[K,V](config: P
     }
   }
 
-  private def groupMessagesToSet(eventsPerTopicAndPartition: Map[(String,Int), Seq[ProducerData[K,Message]]]): Map[(String, Int), ByteBufferMessageSet] = {
+  private def groupMessagesToSet(eventsPerTopicAndPartition: Map[TopicAndPartition, Seq[ProducerData[K,Message]]]) = {
     /** enforce the compressed.topics config here.
      *  If the compression codec is anything other than NoCompressionCodec,
      *    Enable compression only for specified topics if any
@@ -255,25 +247,23 @@ class DefaultEventHandler[K,V](config: P
       ( topicAndPartition,
         config.compressionCodec match {
           case NoCompressionCodec =>
-            trace("Sending %d messages with no compression to topic %s on partition %d"
-              .format(messages.size, topicAndPartition._1, topicAndPartition._2))
+            trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
             new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
           case _ =>
             config.compressedTopics.size match {
               case 0 =>
-                trace("Sending %d messages with compression codec %d to topic %s on partition %d"
-                  .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+                trace("Sending %d messages with compression codec %d to %s"
+                  .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                 new ByteBufferMessageSet(config.compressionCodec, messages: _*)
               case _ =>
-                if(config.compressedTopics.contains(topicAndPartition._1)) {
-                  trace("Sending %d messages with compression codec %d to topic %s on partition %d"
-                    .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+                if(config.compressedTopics.contains(topicAndPartition.topic)) {
+                  trace("Sending %d messages with compression codec %d to %s"
+                    .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                   new ByteBufferMessageSet(config.compressionCodec, messages: _*)
                 }
                 else {
-                  trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
-                    .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
-                    config.compressedTopics.toString))
+                  trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+                    .format(messages.size, topicAndPartition, config.compressedTopics.toString))
                   new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
                 }
             }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Mon Sep 17 20:15:59 2012
@@ -19,7 +19,7 @@ package kafka.server
 
 import kafka.cluster.Broker
 import kafka.consumer.SimpleConsumer
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit
 abstract class  AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
-  private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map
+  private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
   private val fetchMapLock = new Object
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
   val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
@@ -50,7 +50,7 @@ abstract class  AbstractFetcherThread(na
   def handleOffsetOutOfRange(topic: String, partitionId: Int): Long
 
   // deal with partitions with errors, potentially due to leadership changes
-  def handlePartitionsWithErrors(partitions: Iterable[(String, Int)])
+  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
 
   override def shutdown(){
     super.shutdown()
@@ -65,12 +65,15 @@ abstract class  AbstractFetcherThread(na
             minBytes(minBytes)
 
     fetchMapLock synchronized {
-      for ( ((topic, partitionId), offset) <- fetchMap )
-        builder.addFetch(topic, partitionId, offset.longValue, fetchSize)
+      fetchMap.foreach {
+        case((topicAndPartition, offset)) =>
+          builder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
+                           offset, fetchSize)
+      }
     }
 
     val fetchRequest = builder.build()
-    val partitionsWithError = new mutable.HashSet[(String, Int)]
+    val partitionsWithError = new mutable.HashSet[TopicAndPartition]
     var response: FetchResponse = null
     try {
       trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
@@ -90,37 +93,35 @@ abstract class  AbstractFetcherThread(na
     if (response != null) {
       // process fetched data
       fetchMapLock synchronized {
-        for ( topicData <- response.data ) {
-          for ( partitionData <- topicData.partitionDataArray) {
-            val topic = topicData.topic
-            val partitionId = partitionData.partition
-            val key = (topic, partitionId)
-            val currentOffset = fetchMap.get(key)
+        response.data.foreach {
+          case(topicAndPartition, partitionData) =>
+            val (topic, partitionId) = topicAndPartition.asTuple
+            val currentOffset = fetchMap.get(topicAndPartition)
             if (currentOffset.isDefined) {
               partitionData.error match {
                 case ErrorMapping.NoError =>
                   processPartitionData(topic, currentOffset.get, partitionData)
                   val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
-                  val newOffset = currentOffset.get + validBytes
-                  fetchMap.put(key, newOffset)
+                  val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+                  fetchMap.put(topicAndPartition, newOffset)
                   FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
                   fetcherMetrics.byteRate.mark(validBytes)
                 case ErrorMapping.OffsetOutOfRangeCode =>
                   val newOffset = handleOffsetOutOfRange(topic, partitionId)
-                  fetchMap.put(key, newOffset)
+                  fetchMap.put(topicAndPartition, newOffset)
                   warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
-                               .format(currentOffset.get, topic, partitionId, newOffset))
+                    .format(currentOffset.get, topic, partitionId, newOffset))
                 case _ =>
                   error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
-                        ErrorMapping.exceptionFor(partitionData.error))
-                  partitionsWithError += key
-                  fetchMap.remove(key)
+                    ErrorMapping.exceptionFor(partitionData.error))
+                  partitionsWithError += topicAndPartition
+                  fetchMap.remove(topicAndPartition)
               }
             }
-          }
         }
       }
     }
+
     if (partitionsWithError.size > 0) {
       debug("handling partitions with error for %s".format(partitionsWithError))
       handlePartitionsWithErrors(partitionsWithError)
@@ -129,19 +130,19 @@ abstract class  AbstractFetcherThread(na
 
   def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
     fetchMapLock synchronized {
-      fetchMap.put((topic, partitionId), initialOffset)
+      fetchMap.put(TopicAndPartition(topic, partitionId), initialOffset)
     }
   }
 
   def removePartition(topic: String, partitionId: Int) {
     fetchMapLock synchronized {
-      fetchMap.remove((topic, partitionId))
+      fetchMap.remove(TopicAndPartition(topic, partitionId))
     }
   }
 
   def hasPartition(topic: String, partitionId: Int): Boolean = {
     fetchMapLock synchronized {
-      fetchMap.get((topic, partitionId)).isDefined
+      fetchMap.get(TopicAndPartition(topic, partitionId)).isDefined
     }
   }