You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/09/17 22:16:00 UTC

svn commit: r1386806 [2/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/...

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Sep 17 20:15:59 2012
@@ -20,7 +20,6 @@ package kafka.server
 import java.io.IOException
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
-import kafka.common._
 import kafka.message._
 import kafka.network._
 import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging}
@@ -32,6 +31,7 @@ import kafka.network.RequestChannel.Resp
 import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
 import org.I0Itec.zkclient.ZkClient
+import kafka.common._
 
 /**
  * Logic to handle the various Kafka requests
@@ -40,13 +40,14 @@ class KafkaApis(val requestChannel: Requ
                 val replicaManager: ReplicaManager,
                 val zkClient: ZkClient,
                 brokerId: Int) extends Logging {
-  private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
-  private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
+
+  private val producerRequestPurgatory = new ProducerRequestPurgatory
+  private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val delayedRequestMetrics = new DelayedRequestMetrics
   private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength)
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
-  this.logIdent = "[KafkaApi on Broker " + brokerId + "], "
+  this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -93,18 +94,18 @@ class KafkaApis(val requestChannel: Requ
   }
 
   /**
-   * Check if the partitionDataArray from a produce request can unblock any
+   * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
    */
-  def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) {
-    var satisfied = new mutable.ArrayBuffer[DelayedFetch]
-    for(partitionData <- partitionDatas)
-      satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null)
-    trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size))
+  def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) {
+    val partition = partitionData.partition
+    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), null)
+    trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
+
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
       val topicData = readMessageSets(fetchReq.fetch)
-      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
+      val response = FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
     }
   }
@@ -119,28 +120,25 @@ class KafkaApis(val requestChannel: Requ
       requestLogger.trace("Handling producer request " + request.toString)
     trace("Handling producer request " + request.toString)
 
-    val response = produceToLocalLog(produceRequest)
+    val localProduceResponse = produceToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
-    val partitionsInError = response.errors.count(_ != ErrorMapping.NoError)
-    
-    for (topicData <- produceRequest.data)
-      maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
-    
-    if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 ||
-        produceRequest.data.size <= 0 || partitionsInError == response.errors.size)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+
+    val numPartitionsInError = localProduceResponse.status.count(_._2.error != ErrorMapping.NoError)
+    produceRequest.data.foreach(partitionAndData =>
+      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2))
+
+    if (produceRequest.requiredAcks == 0 ||
+        produceRequest.requiredAcks == 1 ||
+        produceRequest.numPartitions <= 0 ||
+        numPartitionsInError == produceRequest.numPartitions)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(localProduceResponse)))
     else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val producerRequestKeys = produceRequest.data.flatMap(topicData => {
-        val topic = topicData.topic
-        topicData.partitionDataArray.map(partitionData => {
-          RequestKey(topic, partitionData.partition)
-        })
-      })
+      val producerRequestKeys = produceRequest.data.keys.map(
+        topicAndPartition => new RequestKey(topicAndPartition)).toSeq
 
       val delayedProduce = new DelayedProduce(
-        producerRequestKeys, request,
-        response.errors, response.offsets,
+        producerRequestKeys, request, localProduceResponse,
         produceRequest, produceRequest.ackTimeoutMs.toLong)
       producerRequestPurgatory.watch(delayedProduce)
 
@@ -164,43 +162,41 @@ class KafkaApis(val requestChannel: Requ
    */
   private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
     trace("Produce [%s] to local log ".format(request.toString))
-    val requestSize = request.topicPartitionCount
-    val errors = new Array[Short](requestSize)
-    val offsets = new Array[Long](requestSize)
-
-    var msgIndex = -1
-    for(topicData <- request.data) {
-      for(partitionData <- topicData.partitionDataArray) {
-        msgIndex += 1
-        BrokerTopicStat.getBrokerTopicStat(topicData.topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
-        BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
-        try {
-          val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
-          val log = localReplica.log.get
-          log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
-          // we may need to increment high watermark since ISR could be down to 1
-          localReplica.partition.maybeIncrementLeaderHW(localReplica)
-          offsets(msgIndex) = log.logEndOffset
-          errors(msgIndex) = ErrorMapping.NoError.toShort
-          trace("%d bytes written to logs, nextAppendOffset = %d"
-            .format(partitionData.messages.sizeInBytes, offsets(msgIndex)))
-        } catch {
-          case e =>
-            BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark()
-            BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
-            error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
-            e match {
-              case _: IOException =>
-                fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
-                System.exit(1)
-              case _ =>
-                errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort
-                offsets(msgIndex) = -1
-            }
-        }
+
+    val localErrorsAndOffsets = request.data.map (topicAndPartitionData => {
+      val (topic, partitionData) = (topicAndPartitionData._1.topic, topicAndPartitionData._2)
+      BrokerTopicStat.getBrokerTopicStat(topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
+      BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
+
+      try {
+        val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partitionData.partition)
+        val log = localReplica.log.get
+        log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+        // we may need to increment high watermark since ISR could be down to 1
+        localReplica.partition.maybeIncrementLeaderHW(localReplica)
+        val responseStatus = ProducerResponseStatus(ErrorMapping.NoError, log.logEndOffset)
+        trace("%d bytes written to logs, nextAppendOffset = %d"
+                      .format(partitionData.messages.sizeInBytes, responseStatus.nextOffset))
+        (TopicAndPartition(topic, partitionData.partition), responseStatus)
+      } catch {
+        case e: Throwable =>
+          BrokerTopicStat.getBrokerTopicStat(topic).failedProduceRequestRate.mark()
+          BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
+          error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition), e)
+          e match {
+            case _: IOException =>
+              fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
+              // compiler requires scala.sys.exit (not System.exit).
+              exit(1)
+            case _ =>
+              val (error, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L)
+              (TopicAndPartition(topic, partitionData.partition), ProducerResponseStatus(error, offset))
+          }
       }
     }
-    new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
+    )
+
+    ProducerResponse(request.versionId, request.correlationId, localErrorsAndOffsets)
   }
 
   /**
@@ -212,27 +208,17 @@ class KafkaApis(val requestChannel: Requ
       requestLogger.trace("Handling fetch request " + fetchRequest.toString)
     trace("Handling fetch request " + fetchRequest.toString)
 
-    // validate the request
-    try {
-      fetchRequest.validate()
-    } catch {
-      case e:FetchRequestFormatException =>
-        val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty)
-        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response))
-        requestChannel.sendResponse(channelResponse)
-    }
-
     if(fetchRequest.isFromFollower) {
       maybeUpdatePartitionHW(fetchRequest)
       // after updating HW, some delayed produce requests may be unblocked
       var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
-      fetchRequest.offsetInfo.foreach(topicOffsetInfo => {
-        topicOffsetInfo.partitions.foreach(partition => {
-          val key = RequestKey(topicOffsetInfo.topic, partition)
+      fetchRequest.requestInfo.foreach {
+        case (topicAndPartition, _) =>
+          val key = new RequestKey(topicAndPartition)
           satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
-        })
-      })
-      debug("Replica %d fetch unblocked %d producer requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size))
+      }
+      debug("Replica %d fetch unblocked %d producer requests."
+        .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
       satisfiedProduceRequests.foreach(_.respond())
     }
 
@@ -243,13 +229,13 @@ class KafkaApis(val requestChannel: Requ
        fetchRequest.numPartitions <= 0) {
       val topicData = readMessageSets(fetchRequest)
       debug("Returning fetch response %s for fetch request with correlation id %d".format(
-        topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
-      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
+        topicData.values.map(_.error).mkString(","), fetchRequest.correlationId))
+      val response = FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
       debug("Putting fetch request into purgatory")
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _)))
+      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
       val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait)
       fetchRequestPurgatory.watch(delayedFetch)
     }
@@ -259,86 +245,77 @@ class KafkaApis(val requestChannel: Requ
    * Calculate the number of available bytes for the given fetch request
    */
   private def availableFetchBytes(fetchRequest: FetchRequest): Long = {
-    var totalBytes = 0L
-    for(offsetDetail <- fetchRequest.offsetInfo) {
-      for(i <- 0 until offsetDetail.partitions.size) {
-        debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
+    val totalBytes = fetchRequest.requestInfo.foldLeft(0L)((folded, curr) => {
+      folded +
+      {
+        val (topic, partition) = (curr._1.topic, curr._1.partition)
+        val (offset, fetchSize) = (curr._2.offset, curr._2.fetchSize)
+        debug("Fetching log for topic %s partition %d".format(topic, partition))
         try {
-          val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i))
+          val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
           val end = if (!fetchRequest.isFromFollower) {
             leader.highWatermark
           } else {
             leader.logEndOffset
           }
-          val available = max(0, end - offsetDetail.offsets(i))
-          totalBytes += math.min(offsetDetail.fetchSizes(i), available)
+          val available = max(0, end - offset)
+          math.min(fetchSize, available)
         } catch {
           case e: UnknownTopicOrPartitionException =>
-            info("Invalid partition %d in fetch request from client %d."
-              .format(offsetDetail.partitions(i), fetchRequest.clientId))
+            info("Invalid partition %d in fetch request from client %s."
+                         .format(partition, fetchRequest.clientId))
+            0
           case e =>
             error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s"
-              .format(offsetDetail.topic, offsetDetail.partitions(i), brokerId, fetchRequest.clientId), e)
+                          .format(topic, partition, brokerId, fetchRequest.clientId), e)
+            0
         }
       }
-    }
+    })
     trace(totalBytes + " available bytes for fetch request.")
     totalBytes
   }
 
   private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
-    val offsets = fetchRequest.offsetInfo
-    debug("Act on update partition HW, check offset detail: %s ".format(offsets))
-    for(offsetDetail <- offsets) {
-      val topic = offsetDetail.topic
-      val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
-      for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
-        replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
-      }
-    }
+    debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
+    fetchRequest.requestInfo.foreach(info => {
+      val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset)
+      replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
+    })
   }
 
   /**
-   * Read from all the offset details given and produce an array of topic datas
-   */
-  private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = {
-    val offsets = fetchRequest.offsetInfo
-    val fetchedData = new mutable.ArrayBuffer[TopicData]()
-
-    for(offsetDetail <- offsets) {
-      val info = new mutable.ArrayBuffer[PartitionData]()
-      val topic = offsetDetail.topic
-      val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
-      for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
-        val isFetchFromFollower = fetchRequest.isFromFollower()
-        val partitionInfo =
-          try {
-            val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
-            BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
-            BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
-            if (!isFetchFromFollower) {
-              new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
-            } else {
-              debug("Leader %d for topic %s partition %d received fetch request from follower %d"
-                .format(brokerId, topic, partition, fetchRequest.replicaId))
-              debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
-                .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
-              new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
-            }
-          }
-          catch {
-            case e =>
-              BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
-              BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
-              error("error when processing request " + (topic, partition, offset, fetchSize), e)
-              new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-                                offset, -1L, MessageSet.Empty)
+   * Read from all the offset details given and return a map of
+   * (topic, partition) -> PartitionData
+   */
+  private def readMessageSets(fetchRequest: FetchRequest) = {
+    val isFetchFromFollower = fetchRequest.isFromFollower
+    fetchRequest.requestInfo.map {
+      case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
+        val partitionData = try {
+          val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
+          BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
+          BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
+          if (!isFetchFromFollower) {
+            new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
+          } else {
+            debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+                          .format(brokerId, topic, partition, fetchRequest.replicaId))
+            debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
+                          .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
+            new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
           }
-        info.append(partitionInfo)
-      }
-      fetchedData.append(new TopicData(topic, info.toArray))
+        }
+        catch {
+          case t: Throwable =>
+            BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
+            BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
+            error("error when processing request " + (topic, partition, offset, fetchSize), t)
+            new PartitionData(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
+                              offset, -1L, MessageSet.Empty)
+        }
+        (TopicAndPartition(topic, partition), partitionData)
     }
-    fetchedData.toArray
   }
 
   /**
@@ -454,8 +431,14 @@ class KafkaApis(val requestChannel: Requ
 
   private [kafka] case class RequestKey(topic: String, partition: Int)
           extends MetricKey {
+
+    def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
+
+    def topicAndPartition = TopicAndPartition(topic, partition)
+
     override def keyLabel = "%s-%d".format(topic, partition)
   }
+
   /**
    * A delayed fetch request
    */
@@ -465,9 +448,9 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
 
-    this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
+    this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@@ -480,7 +463,7 @@ class KafkaApis(val requestChannel: Requ
      */
     def expire(delayed: DelayedFetch) {
       val topicData = readMessageSets(delayed.fetch)
-      val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
+      val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
       val fromFollower = delayed.fetch.isFromFollower
       delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
@@ -489,48 +472,43 @@ class KafkaApis(val requestChannel: Requ
 
   class DelayedProduce(keys: Seq[RequestKey],
                        request: RequestChannel.Request,
-                       localErrors: Array[Short],
-                       requiredOffsets: Array[Long],
+                       localProduceResponse: ProducerResponse,
                        val produce: ProducerRequest,
                        delayMs: Long)
           extends DelayedRequest(keys, request, delayMs) with Logging {
 
+    private val initialErrorsAndOffsets = localProduceResponse.status
     /**
      * Map of (topic, partition) -> partition status
      * The values in this map don't need to be synchronized since updates to the
      * values are effectively synchronized by the ProducerRequestPurgatory's
      * update method
      */
-    private [kafka] val partitionStatus = keys.map(key => {
-      val keyIndex = keys.indexOf(key)
+    private [kafka] val partitionStatus = keys.map(requestKey => {
+      val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition))
       // if there was an error in writing to the local replica's log, then don't
       // wait for acks on this partition
-      val acksPending =
-        if (localErrors(keyIndex) == ErrorMapping.NoError) {
+      val (acksPending, error, nextOffset) =
+        if (producerResponseStatus.error == ErrorMapping.NoError) {
           // Timeout error state will be cleared when requiredAcks are received
-          localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode
-          true
+          (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.nextOffset)
         }
-        else
-          false
+        else (false, producerResponseStatus.error, producerResponseStatus.nextOffset)
 
-      val initialStatus = new PartitionStatus(acksPending, localErrors(keyIndex), requiredOffsets(keyIndex))
-      trace("Initial partition status for %s = %s".format(key, initialStatus))
-      (key, initialStatus)
+      val initialStatus = PartitionStatus(acksPending, error, nextOffset)
+      trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus))
+      (requestKey, initialStatus)
     }).toMap
 
-
     def respond() {
-      val errorsAndOffsets: (List[Short], List[Long]) = (
-        keys.foldRight
-          ((List[Short](), List[Long]()))
-          ((key: RequestKey, result: (List[Short], List[Long])) => {
-            val status = partitionStatus(key)
-            (status.error :: result._1, status.requiredOffset :: result._2)
-          })
-        )
-      val response = new ProducerResponse(produce.versionId, produce.correlationId,
-                                          errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
+      
+      val finalErrorsAndOffsets = initialErrorsAndOffsets.map(
+        status => {
+          val pstat = partitionStatus(new RequestKey(status._1))
+          (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
+        })
+      
+      val response = ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets)
 
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))
@@ -565,9 +543,8 @@ class KafkaApis(val requestChannel: Requ
           fetchPartitionStatus.error = ErrorMapping.NoError
         }
         if (!fetchPartitionStatus.acksPending) {
-          val topicData = produce.data.find(_.topic == topic).get
-          val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get
-          maybeUnblockDelayedFetchRequests(topic, Array(partitionData))
+          val partitionData = produce.data(followerFetchRequestKey.topicAndPartition)
+          maybeUnblockDelayedFetchRequests(topic, partitionData)
         }
       }
 
@@ -576,9 +553,9 @@ class KafkaApis(val requestChannel: Requ
       satisfied
     }
 
-    class PartitionStatus(var acksPending: Boolean,
+    case class PartitionStatus(var acksPending: Boolean,
                           var error: Short,
-                          val requiredOffset: Long) {
+                          requiredOffset: Long) {
       def setThisBrokerNotLeader() {
         error = ErrorMapping.NotLeaderForPartitionCode
         acksPending = false
@@ -594,9 +571,9 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for produce requests waiting to be satisfied.
    */
-  private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
+  private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
 
-    this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId)
+    this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
 
     protected def checkSatisfied(followerFetchRequestKey: RequestKey,
                                  delayedProduce: DelayedProduce) =

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Mon Sep 17 20:15:59 2012
@@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val 
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
-    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d on broker %d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
+    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d-".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
   }
 
   def shutdown() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Mon Sep 17 20:15:59 2012
@@ -20,6 +20,8 @@ package kafka.server
 import kafka.api.{OffsetRequest, PartitionData}
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
+import kafka.common.TopicAndPartition
+
 
 class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
@@ -56,7 +58,7 @@ class ReplicaFetcherThread(name:String, 
   }
 
   // any logic for partitions whose leader has changed
-  def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) {
+  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
     // no handler needed since the controller will make the changes accordingly
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Sep 17 20:15:59 2012
@@ -367,7 +367,7 @@ object Utils extends Logging {
   /**
    * Read an unsigned integer from the given position without modifying the buffers
    * position
-   * @param The buffer to read from
+   * @param buffer the buffer to read from
    * @param index the index from which to read the integer
    * @return The integer read, as a long to avoid signedness
    */

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Mon Sep 17 20:15:59 2012
@@ -72,7 +72,7 @@ class LazyInitProducerTest extends JUnit
     // send an invalid offset
     try {
       val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
-      fetchedWithError.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+      fetchedWithError.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
       fail("Expected an OffsetOutOfRangeException exception to be thrown")
     } catch {
       case e: OffsetOutOfRangeException => 
@@ -101,23 +101,22 @@ class LazyInitProducerTest extends JUnit
       }
     }
 
-    {
-      // send some invalid offsets
-      val builder = new FetchRequestBuilder()
-      for( (topic, offset) <- topicOffsets )
-        builder.addFetch(topic, offset, -1, 10000)
+    // send some invalid offsets
+    val builder = new FetchRequestBuilder()
+    for( (topic, offset) <- topicOffsets )
+      builder.addFetch(topic, offset, -1, 10000)
+
+    val request = builder.build()
+    val responses = consumer.fetch(request)
+    responses.data.values.foreach(pd => {
+      try {
+        ErrorMapping.maybeThrowException(pd.error)
+        fail("Expected an OffsetOutOfRangeException exception to be thrown")
+      } catch {
+        case e: OffsetOutOfRangeException =>
 
-      val request = builder.build()
-      val responses = consumer.fetch(request)
-      for(topicData <- responses.data) {
-        try {
-          topicData.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
-          fail("Expected an OffsetOutOfRangeException exception to be thrown")
-        } catch {
-          case e: OffsetOutOfRangeException =>
-        }
       }
-    }
+    })
   }
 
   def testMultiProduce() {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Mon Sep 17 20:15:59 2012
@@ -19,7 +19,7 @@ package kafka.integration
 
 import java.nio.ByteBuffer
 import junit.framework.Assert._
-import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
+import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import java.util.Properties
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
@@ -31,8 +31,8 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
 import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -77,27 +77,11 @@ class PrimitiveApiTest extends JUnit3Sui
     assertEquals(request, deserializedRequest)
   }
 
-  def testFetchRequestEnforcesUniqueTopicsForOffsetDetails() {
-    val offsets = Array(
-      new OffsetDetail("topic1", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
-      new OffsetDetail("topic2", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
-      new OffsetDetail("topic1", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
-      new OffsetDetail("topic2", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000))
-    )
-    val request = new FetchRequest(offsetInfo = offsets)
-    try {
-      consumer.fetch(request)
-      fail("FetchRequest should throw FetchRequestFormatException due to duplicate topics")
-    } catch {
-      case e: FetchRequestFormatException => "success"
-    }
-  }
-
   def testEmptyFetchRequest() {
-    val offsets = Array[OffsetDetail]()
-    val request = new FetchRequest(offsetInfo = offsets)
+    val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
+    val request = new FetchRequest(requestInfo = partitionRequests)
     val fetched = consumer.fetch(request)
-    assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0)
+    assertTrue(!fetched.hasError && fetched.data.size == 0)
   }
 
   def testDefaultEncoderProducerAndFetch() {
@@ -189,7 +173,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+        response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
@@ -205,7 +189,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+        response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => "this is good"
@@ -253,7 +237,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+        response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
@@ -269,7 +253,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+        response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => "this is good"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala Mon Sep 17 20:15:59 2012
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.network;
+package kafka.network
 
 import org.junit._
 import org.scalatest.junit.JUnitSuite
@@ -24,31 +24,45 @@ import java.nio.ByteBuffer
 import kafka.api._
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.cluster.Broker
-import kafka.common.ErrorMapping
 import collection.mutable._
+import kafka.common.{TopicAndPartition, ErrorMapping}
+
 
 object RpcDataSerializationTestUtils{
   private val topic1 = "test1"
   private val topic2 = "test2"
-  private val leader1 = 0;
+  private val leader1 = 0
   private val isr1 = List(0, 1, 2)
-  private val leader2 = 0;
+  private val leader2 = 0
   private val isr2 = List(0, 2, 3)
   private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes)))
   private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes)))
   private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes)))
   private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes)))
   private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3)
-  private val topicData1 = new TopicData(topic1, partitionDataArray)
-  private val topicData2 = new TopicData(topic2, partitionDataArray)
-  private val topicDataArray = Array(topicData1, topicData2)
-  private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100))
-  private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100))
-  private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2)
-  private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
-  private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
-  private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
-  private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
+
+  private val topicData = {
+    val groupedData = Array(topic1, topic2).flatMap(topic =>
+      partitionDataArray.map(partitionData =>
+        (TopicAndPartition(topic, partitionData.partition), partitionData)))
+    collection.immutable.Map(groupedData:_*)
+  }
+
+  private val requestInfos = collection.immutable.Map(
+    TopicAndPartition(topic1, 0) -> PartitionFetchInfo(1000, 100),
+    TopicAndPartition(topic1, 1) -> PartitionFetchInfo(2000, 100),
+    TopicAndPartition(topic1, 2) -> PartitionFetchInfo(3000, 100),
+    TopicAndPartition(topic1, 3) -> PartitionFetchInfo(4000, 100),
+    TopicAndPartition(topic2, 0) -> PartitionFetchInfo(1000, 100),
+    TopicAndPartition(topic2, 1) -> PartitionFetchInfo(2000, 100),
+    TopicAndPartition(topic2, 2) -> PartitionFetchInfo(3000, 100),
+    TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
+  )
+
+  private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
   private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
   private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
   private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
@@ -78,19 +92,21 @@ object RpcDataSerializationTestUtils{
   }
 
   def createTestProducerRequest: ProducerRequest = {
-    new ProducerRequest(1, "client 1", 0, 1000, topicDataArray)
+    new ProducerRequest(1, "client 1", 0, 1000, topicData)
   }
 
-  def createTestProducerResponse: ProducerResponse = {
-    new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0)
-  }
+  def createTestProducerResponse: ProducerResponse =
+    ProducerResponse(1, 1, Map(
+      TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001),
+      TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
+    ))
 
   def createTestFetchRequest: FetchRequest = {
-    new FetchRequest(offsetInfo = offsetDetailSeq)
+    new FetchRequest(requestInfo = requestInfos)
   }
 
   def createTestFetchResponse: FetchResponse = {
-    new FetchResponse(1, 1, topicDataArray)
+    FetchResponse(1, 1, topicData)
   }
 
   def createTestOffsetRequest: OffsetRequest = {
@@ -154,7 +170,7 @@ class RpcDataSerializationTest extends J
     assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse,
                  deserializedStopReplicaResponse)
 
-    buffer = ByteBuffer.allocate(producerRequest.sizeInBytes())
+    buffer = ByteBuffer.allocate(producerRequest.sizeInBytes)
     producerRequest.writeTo(buffer)
     buffer.rewind()
     val deserializedProducerRequest = ProducerRequest.readFrom(buffer)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Mon Sep 17 20:15:59 2012
@@ -25,8 +25,10 @@ import kafka.utils.TestUtils
 import java.util.Random
 import junit.framework.Assert._
 import kafka.producer.SyncProducerConfig
-import kafka.api.{TopicData, ProducerRequest}
+import kafka.api.{PartitionData, ProducerRequest}
 import java.nio.ByteBuffer
+import kafka.common.TopicAndPartition
+
 
 class SocketServerTest extends JUnitSuite {
 
@@ -75,9 +77,10 @@ class SocketServerTest extends JUnitSuit
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+    val emptyRequest =
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]())
 
-    val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes())
+    val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
     emptyRequest.writeTo(byteBuffer)
     byteBuffer.rewind()
     val serializedBytes = new Array[Byte](byteBuffer.remaining)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Mon Sep 17 20:15:59 2012
@@ -201,11 +201,11 @@ class AsyncProducerTest extends JUnit3Su
     topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
     val expectedResult = Some(Map(
         0 -> Map(
-              ("topic1", 0) -> topic1Broker1Data,
-              ("topic2", 0) -> topic2Broker1Data),
+              TopicAndPartition("topic1", 0) -> topic1Broker1Data,
+              TopicAndPartition("topic2", 0) -> topic2Broker1Data),
         1 -> Map(
-              ("topic1", 1) -> topic1Broker2Data,
-              ("topic2", 1) -> topic2Broker2Data)
+              TopicAndPartition("topic1", 1) -> topic1Broker2Data,
+              TopicAndPartition("topic2", 1) -> topic2Broker2Data)
       ))
 
     val actualResult = handler.partitionAndCollate(producerDataList)
@@ -344,7 +344,7 @@ class AsyncProducerTest extends JUnit3Su
     partitionedDataOpt match {
       case Some(partitionedData) =>
         for ((brokerId, dataPerBroker) <- partitionedData) {
-          for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
+          for ( (TopicAndPartition(topic, partitionId), dataPerTopic) <- dataPerBroker)
             assertTrue(partitionId == 0)
         }
       case None =>
@@ -408,10 +408,12 @@ class AsyncProducerTest extends JUnit3Su
     // entirely.  The second request will succeed for partition 1 but fail for partition 0.
     // On the third try for partition 0, let it succeed.
     val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0)
-    val response1 =
-      new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
+    val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
+      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
+          (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs))
-    val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))
+    val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
+      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
     EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Mon Sep 17 20:15:59 2012
@@ -21,14 +21,14 @@ import java.net.SocketTimeoutException
 import java.util.Properties
 import junit.framework.Assert
 import kafka.admin.CreateTopicCommand
-import kafka.common.{ErrorMapping, MessageSizeTooLargeException}
 import kafka.integration.KafkaServerTestHarness
-import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
-import kafka.api.TopicData
+import kafka.api.{ProducerResponseStatus, PartitionData}
+import kafka.common.{TopicAndPartition, ErrorMapping}
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
@@ -85,11 +85,11 @@ class SyncProducerTest extends JUnit3Sui
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
-    Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0)
+    Assert.assertTrue(!response.hasError && response.status.size == 0)
   }
 
   @Test
@@ -109,17 +109,17 @@ class SyncProducerTest extends JUnit3Sui
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
     val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
 
-    Assert.assertEquals(1, response1.errors.length)
-    Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.errors(0))
-    Assert.assertEquals(-1L, response1.offsets(0))
+    Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
+    Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error)
+    Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).nextOffset)
 
     val message2 = new Message(new Array[Byte](1000000))
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
     val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
 
-    Assert.assertEquals(1, response2.errors.length)
-    Assert.assertEquals(ErrorMapping.NoError, response2.errors(0))
-    Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0))
+    Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
+    Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error)
+    Assert.assertEquals(messageSet2.sizeInBytes, response2.status(TopicAndPartition("test", 0)).nextOffset)
   }
 
   @Test
@@ -142,10 +142,12 @@ class SyncProducerTest extends JUnit3Sui
 
     Assert.assertNotNull(response)
     Assert.assertEquals(request.correlationId, response.correlationId)
-    Assert.assertEquals(response.errors.length, response.offsets.length)
-    Assert.assertEquals(3, response.errors.length)
-    response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, _))
-    response.offsets.foreach(Assert.assertEquals(-1L, _))
+    Assert.assertEquals(3, response.status.size)
+    response.status.values.foreach {
+      case ProducerResponseStatus(error, nextOffset) =>
+        Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, error)
+        Assert.assertEquals(-1L, nextOffset)
+    }
 
     // #2 - test that we get correct offsets when partition is owned by broker
     CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
@@ -156,18 +158,18 @@ class SyncProducerTest extends JUnit3Sui
     val response2 = producer.send(request)
     Assert.assertNotNull(response2)
     Assert.assertEquals(request.correlationId, response2.correlationId)
-    Assert.assertEquals(response2.errors.length, response2.offsets.length)
-    Assert.assertEquals(3, response2.errors.length)
+    Assert.assertEquals(3, response2.status.size)
 
     // the first and last message should have been accepted by broker
-    Assert.assertEquals(0, response2.errors(0))
-    Assert.assertEquals(0, response2.errors(2))
-    Assert.assertEquals(messages.sizeInBytes, response2.offsets(0))
-    Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
+    Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic1", 0)).error)
+    Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic3", 0)).error)
+    Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic1", 0)).nextOffset)
+    Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic3", 0)).nextOffset)
 
     // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1))
-    Assert.assertEquals(-1, response2.offsets(1))
+    Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort,
+                        response2.status(TopicAndPartition("topic2", 0)).error)
+    Assert.assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).nextOffset)
   }
 
   @Test

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Sep 17 20:15:59 2012
@@ -34,7 +34,9 @@ import kafka.consumer.ConsumerConfig
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
 import kafka.api._
+import collection.mutable.Map
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
+import kafka.common.TopicAndPartition
 
 
 /**
@@ -364,28 +366,10 @@ object TestUtils extends Logging {
     val correlationId = SyncProducerConfig.DefaultCorrelationId
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
-    val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray))
-    new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, data.toArray)
-  }
-
-  def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
-    produceJavaRequest(-1,topic,-1,message)
-  }
-
-  def produceJavaRequest(topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
-    produceJavaRequest(-1,topic,partition,message)
-  }
-
-  def produceJavaRequest(correlationId: Int, topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
-    val clientId = "test"
-    val requiredAcks: Short = 0
-    val ackTimeoutMs = 0
-    var data = new Array[TopicData](1)
-    var partitionData = new Array[PartitionData](1)
-    partitionData(0) = new PartitionData(partition,message.underlying)
-    data(0) = new TopicData(topic,partitionData)
-    val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
-    pr
+    val data = topics.flatMap(topic =>
+      partitions.map(partition => (TopicAndPartition(topic,  partition), new PartitionData(partition, message)))
+    )
+    new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
   }
 
   def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {