You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/06 03:28:48 UTC

svn commit: r1346697 - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/m...

Author: junrao
Date: Wed Jun  6 01:28:46 2012
New Revision: 1346697

URL: http://svn.apache.org/viewvc?rev=1346697&view=rev
Log:
Add admin RPC requests; clean up Response objects; patched by Yang Ye; reviewed by Jun Rao; KAFKA-349; KAFKA-336

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Request.scala
Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Wed Jun  6 01:28:46 2012
@@ -255,7 +255,7 @@ public class KafkaETLContext {
      */
     protected boolean hasError(ByteBufferMessageSet messages)
             throws IOException {
-        int errorCode = messages.getErrorCode();
+        short errorCode = messages.getErrorCode();
         if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
             /* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
                Kafka server may delete old files from time to time */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Wed Jun  6 01:28:46 2012
@@ -18,7 +18,6 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.network.Request
 import kafka.utils.Utils
 import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
 import kafka.common.FetchRequestFormatException
@@ -105,7 +104,7 @@ case class FetchRequest(versionId: Short
                         replicaId: Int = FetchRequest.DefaultReplicaId,
                         maxWait: Int = FetchRequest.DefaultMaxWait,
                         minBytes: Int = FetchRequest.DefaultMinBytes,
-                        offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
+                        offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) {
 
   // ensure that a topic "X" appears in at most one OffsetDetail
   def validate() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Wed Jun  6 01:28:46 2012
@@ -26,8 +26,8 @@ import kafka.utils.Utils
 
 object PartitionData {
   def readFrom(buffer: ByteBuffer): PartitionData = {
+    val error = buffer.getShort
     val partition = buffer.getInt
-    val error = buffer.getInt
     val initialOffset = buffer.getLong
     val hw = buffer.getLong()
     val messageSetSize = buffer.getInt
@@ -38,21 +38,48 @@ object PartitionData {
   }
 }
 
-case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L,
-                         messages: MessageSet) {
-  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + 8
+case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
+  val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8
 
   def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
+}
+
+// SENDS
+
+class PartitionDataSend(val partitionData: PartitionData) extends Send {
+  private val messageSize = partitionData.messages.sizeInBytes
+  private var messagesSentSize = 0L
+
+  private val buffer = ByteBuffer.allocate(26)
+  buffer.putShort(partitionData.error)
+  buffer.putInt(partitionData.partition)
+  buffer.putLong(partitionData.initialOffset)
+  buffer.putLong(partitionData.hw)
+  buffer.putInt(partitionData.messages.sizeInBytes.intValue())
+  buffer.rewind()
+
+  def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
 
+  def writeTo(channel: GatheringByteChannel): Int = {
+    var written = 0
+    if(buffer.hasRemaining)
+      written += channel.write(buffer)
+    if(!buffer.hasRemaining && messagesSentSize < messageSize) {
+      val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
+      messagesSentSize += bytesSent
+      written += bytesSent
+    }
+    written
+  }
 }
 
-object TopicData {
 
+object TopicData {
   def readFrom(buffer: ByteBuffer): TopicData = {
     val topic = Utils.readShortString(buffer, "UTF-8")
     val partitionCount = buffer.getInt
     val partitions = new Array[PartitionData](partitionCount)
-    for(i <- 0 until partitions.length)
+    for(i <- 0 until partitionCount)
       partitions(i) = PartitionData.readFrom(buffer)
     new TopicData(topic, partitions.sortBy(_.partition))
   }
@@ -90,26 +117,61 @@ case class TopicData(topic: String, part
   }
 }
 
-object FetchResponse {
-  val CurrentVersion = 1.shortValue()
+class TopicDataSend(val topicData: TopicData) extends Send {
+  val size = topicData.sizeInBytes
 
+  var sent = 0
+
+  private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
+  Utils.writeShortString(buffer, topicData.topic, "UTF-8")
+  buffer.putInt(topicData.partitionData.length)
+  buffer.rewind()
+
+  val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) {
+    val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes)
+  }
+
+  def complete = sent >= size
+
+  def writeTo(channel: GatheringByteChannel): Int = {
+    expectIncomplete()
+    var written = 0
+    if(buffer.hasRemaining)
+      written += channel.write(buffer)
+    if(!buffer.hasRemaining && !sends.complete) {
+      written += sends.writeCompletely(channel)
+    }
+    sent += written
+    written
+  }
+}
+
+
+
+
+object FetchResponse {
   def readFrom(buffer: ByteBuffer): FetchResponse = {
     val versionId = buffer.getShort
+    val errorCode = buffer.getShort
     val correlationId = buffer.getInt
     val dataCount = buffer.getInt
     val data = new Array[TopicData](dataCount)
     for(i <- 0 until data.length)
       data(i) = TopicData.readFrom(buffer)
-    new FetchResponse(versionId, correlationId, data)
+    new FetchResponse(versionId, correlationId, data, errorCode)
   }
 }
 
-case class FetchResponse(versionId: Short, correlationId: Int, data: Array[TopicData])  {
 
-  val sizeInBytes = 2 + 4 + data.foldLeft(4)(_ + _.sizeInBytes)
+case class FetchResponse(versionId: Short,
+                         correlationId: Int,
+                         data: Array[TopicData],
+                         errorCode: Short = ErrorMapping.NoError)  {
+
+  val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes)
 
   lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
-  
+
   def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
     val messageSet = topicMap.get(topic) match {
       case Some(topicData) =>
@@ -129,75 +191,15 @@ case class FetchResponse(versionId: Shor
   }
 }
 
-// SENDS
-
-class PartitionDataSend(val partitionData: PartitionData) extends Send {
-  private val messageSize = partitionData.messages.sizeInBytes
-  private var messagesSentSize = 0L
-
-  private val buffer = ByteBuffer.allocate(28)
-  buffer.putInt(partitionData.partition)
-  buffer.putInt(partitionData.error)
-  buffer.putLong(partitionData.initialOffset)
-  buffer.putLong(partitionData.hw)
-  buffer.putInt(partitionData.messages.sizeInBytes.intValue())
-  buffer.rewind()
-
-  def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
-
-  def writeTo(channel: GatheringByteChannel): Int = {
-    var written = 0
-    if(buffer.hasRemaining)
-      written += channel.write(buffer)
-    if(!buffer.hasRemaining && messagesSentSize < messageSize) {
-      val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
-      messagesSentSize += bytesSent
-      written += bytesSent
-    }
-    written
-  }
-}
-
-class TopicDataSend(val topicData: TopicData) extends Send {
-  val size = topicData.sizeInBytes
-
-  var sent = 0
-
-  private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
-  Utils.writeShortString(buffer, topicData.topic, "UTF-8")
-  buffer.putInt(topicData.partitionData.length)
-  buffer.rewind()
-
-  val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) {
-    val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes)
-  }
-
-  def complete = sent >= size
-
-  def writeTo(channel: GatheringByteChannel): Int = {
-    expectIncomplete()
-    var written = 0
-    if(buffer.hasRemaining)
-      written += channel.write(buffer)
-    if(!buffer.hasRemaining && !sends.complete) {
-      written += sends.writeCompletely(channel)
-    }
-    sent += written
-    written
-  }
-}
-
-class FetchResponseSend(val fetchResponse: FetchResponse,
-                        val errorCode: Int = ErrorMapping.NoError) extends Send {
 
+class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
   private val size = fetchResponse.sizeInBytes
-  
   private var sent = 0
   
   private val buffer = ByteBuffer.allocate(16)
-  buffer.putInt(size + 2)
-  buffer.putShort(errorCode.shortValue())
+  buffer.putInt(size)
   buffer.putShort(fetchResponse.versionId)
+  buffer.putShort(fetchResponse.errorCode)
   buffer.putInt(fetchResponse.correlationId)
   buffer.putInt(fetchResponse.data.length)
   buffer.rewind()
@@ -220,6 +222,5 @@ class FetchResponseSend(val fetchRespons
     written
   }
 
-  def sendSize = 4 + 2 + fetchResponse.sizeInBytes
-
+  def sendSize = 4 + fetchResponse.sizeInBytes
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala Wed Jun  6 01:28:46 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package kafka.api
+
+import java.nio._
+import kafka.utils._
+import collection.mutable.Map
+import collection.mutable.HashMap
+
+
+object LeaderAndISR {
+  def readFrom(buffer: ByteBuffer): LeaderAndISR = {
+    val leader = buffer.getInt
+    val leaderGenId = buffer.getInt
+    val ISRString = Utils.readShortString(buffer, "UTF-8")
+    val ISR = ISRString.split(",").map(_.toInt).toList
+    val zkVersion = buffer.getLong
+    new LeaderAndISR(leader, leaderGenId, ISR, zkVersion)
+  }
+}
+
+case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Long){
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(leader)
+    buffer.putInt(leaderEpoc)
+    Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8")
+    buffer.putLong(zkVersion)
+  }
+
+  def sizeInBytes(): Int = {
+    val size = 4 + 4 + (2 + ISR.mkString(",").length) + 8
+    size
+  }
+}
+
+
+object LeaderAndISRRequest {
+  val CurrentVersion = 1.shortValue()
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = {
+    val versionId = buffer.getShort
+    val clientId = Utils.readShortString(buffer)
+    val isInit = buffer.get()
+    val ackTimeout = buffer.getInt
+    val leaderAndISRRequestCount = buffer.getInt
+    val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
+
+    for(i <- 0 until leaderAndISRRequestCount){
+      val topic = Utils.readShortString(buffer, "UTF-8")
+      val partition = buffer.getInt
+      val leaderAndISRRequest = LeaderAndISR.readFrom(buffer)
+
+      leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
+    }
+    new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeout, leaderAndISRInfos)
+  }
+}
+
+
+case class LeaderAndISRRequest (versionId: Short,
+                                clientId: String,
+                                isInit: Byte,
+                                ackTimeout: Int,
+                                leaderAndISRInfos:
+                                Map[(String, Int), LeaderAndISR])
+        extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
+  def this(isInit: Byte, ackTimeout: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
+    this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeout, leaderAndISRInfos)
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    Utils.writeShortString(buffer, clientId)
+    buffer.put(isInit)
+    buffer.putInt(ackTimeout)
+    buffer.putInt(leaderAndISRInfos.size)
+    for((key, value) <- leaderAndISRInfos){
+      Utils.writeShortString(buffer, key._1, "UTF-8")
+      buffer.putInt(key._2)
+      value.writeTo(buffer)
+    }
+  }
+
+  def sizeInBytes(): Int = {
+    var size = 1 + 2 + (2 + clientId.length) + 4 + 4
+    for((key, value) <- leaderAndISRInfos)
+      size += (2 + key._1.length) + 4 + value.sizeInBytes
+    size
+  }
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala Wed Jun  6 01:28:46 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import kafka.common.ErrorMapping
+import java.nio.ByteBuffer
+import kafka.utils.Utils
+import collection.mutable.HashMap
+import collection.mutable.Map
+
+
+object LeaderAndISRResponse {
+  def readFrom(buffer: ByteBuffer): LeaderAndISRResponse = {
+    val versionId = buffer.getShort
+    val errorCode = buffer.getShort
+    val numEntries = buffer.getInt
+    val responseMap = new HashMap[(String, Int), Short]()
+    for (i<- 0 until numEntries){
+      val topic = Utils.readShortString(buffer, "UTF-8")
+      val partition = buffer.getInt
+      val partitionErrorCode = buffer.getShort
+      responseMap.put((topic, partition), partitionErrorCode)
+    }
+    new LeaderAndISRResponse(versionId, responseMap, errorCode)
+  }
+}
+
+
+case class LeaderAndISRResponse(versionId: Short,
+                                responseMap: Map[(String, Int), Short],
+                                errorCode: Short = ErrorMapping.NoError)
+        extends RequestOrResponse{
+  def sizeInBytes(): Int ={
+    var size = 2 + 2 + 4
+    for ((key, value) <- responseMap){
+      size += 2 + key._1.length + 4 + 2
+    }
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putShort(errorCode)
+    buffer.putInt(responseMap.size)
+    for ((key:(String, Int), value) <- responseMap){
+      Utils.writeShortString(buffer, key._1, "UTF-8")
+      buffer.putInt(key._2)
+      buffer.putShort(value)
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Wed Jun  6 01:28:46 2012
@@ -18,83 +18,50 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.utils.{nonthreadsafe, Utils}
-import kafka.network.{Send, Request}
-import java.nio.channels.GatheringByteChannel
-import kafka.common.ErrorMapping
+import kafka.utils.Utils
 
 object OffsetRequest {
+  val CurrentVersion = 1.shortValue()
+  val DefaultClientId = ""
+
   val SmallestTimeString = "smallest"
   val LargestTimeString = "largest"
   val LatestTime = -1L
   val EarliestTime = -2L
 
   def readFrom(buffer: ByteBuffer): OffsetRequest = {
+    val versionId = buffer.getShort
+    val clientId = Utils.readShortString(buffer)
     val topic = Utils.readShortString(buffer, "UTF-8")
     val partition = buffer.getInt()
     val offset = buffer.getLong
     val maxNumOffsets = buffer.getInt
-    new OffsetRequest(topic, partition, offset, maxNumOffsets)
+    new OffsetRequest(versionId, clientId, topic, partition, offset, maxNumOffsets)
   }
+}
 
-  def serializeOffsetArray(offsets: Array[Long]): ByteBuffer = {
-    val size = 4 + 8 * offsets.length
-    val buffer = ByteBuffer.allocate(size)
-    buffer.putInt(offsets.length)
-    for (i <- 0 until offsets.length)
-      buffer.putLong(offsets(i))
-    buffer.rewind
-    buffer
-  }
+case class OffsetRequest(versionId: Short = OffsetRequest.CurrentVersion,
+                    clientId: String = OffsetRequest.DefaultClientId,
+                    topic: String,
+                    partition: Int,
+                    time: Long,
+                    maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) {
+  def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) =
+    this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets)
 
-  def deserializeOffsetArray(buffer: ByteBuffer): Array[Long] = {
-    val size = buffer.getInt
-    val offsets = new Array[Long](size)
-    for (i <- 0 until offsets.length)
-      offsets(i) = buffer.getLong
-    offsets
-  }
-}
 
-class OffsetRequest(val topic: String,
-                    val partition: Int,
-                    val time: Long,
-                    val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
 
   def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    Utils.writeShortString(buffer, clientId)
     Utils.writeShortString(buffer, topic)
     buffer.putInt(partition)
     buffer.putLong(time)
     buffer.putInt(maxNumOffsets)
   }
 
-  def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
-
-  override def toString(): String= "OffsetRequest(topic:" + topic + ", part:" + partition + ", time:" + time +
-          ", maxNumOffsets:" + maxNumOffsets + ")"
-}
+  def sizeInBytes(): Int = 2 + (2 + clientId.length()) + (2 + topic.length) + 4 + 8 + 4
 
-@nonthreadsafe
-private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send {
-  private var size: Long = offsets.foldLeft(4)((sum, _) => sum + 8)
-  private val header = ByteBuffer.allocate(6)
-  header.putInt(size.asInstanceOf[Int] + 2)
-  header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
-  header.rewind()
-  private val contentBuffer = OffsetRequest.serializeOffsetArray(offsets)
-
-  var complete: Boolean = false
-
-  def writeTo(channel: GatheringByteChannel): Int = {
-    expectIncomplete()
-    var written = 0
-    if(header.hasRemaining)
-      written += channel.write(header)
-    if(!header.hasRemaining && contentBuffer.hasRemaining)
-      written += channel.write(contentBuffer)
-
-    if(!contentBuffer.hasRemaining)
-      complete = true
-    written
-  }
+  override def toString(): String= "OffsetRequest(version:" + versionId + ", client id:" + clientId +
+          ", topic:" + topic + ", part:" + partition + ", time:" + time + ", maxNumOffsets:" + maxNumOffsets + ")"
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala Wed Jun  6 01:28:46 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+
+
+object OffsetResponse {
+  def readFrom(buffer: ByteBuffer): OffsetResponse = {
+    val versionId = buffer.getShort
+    val errorCode = buffer.getShort
+    val offsetsSize = buffer.getInt
+    val offsets = new Array[Long](offsetsSize)
+    for( i <- 0 until offsetsSize) {
+      offsets(i) = buffer.getLong
+    }
+    new OffsetResponse(versionId, offsets, errorCode)
+  }
+}
+
+case class OffsetResponse(versionId: Short,
+                          offsets: Array[Long],
+                          errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+  val sizeInBytes = 2 + 2 + offsets.foldLeft(4)((sum, _) => sum + 8)
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    /* error code */
+    buffer.putShort(errorCode)
+    buffer.putInt(offsets.length)
+    offsets.foreach(buffer.putLong(_))
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Wed Jun  6 01:28:46 2012
@@ -19,7 +19,6 @@ package kafka.api
 
 import java.nio._
 import kafka.message._
-import kafka.network._
 import kafka.utils._
 
 object ProducerRequest {
@@ -58,7 +57,7 @@ case class ProducerRequest( versionId: S
                             clientId: String,
                             requiredAcks: Short,
                             ackTimeout: Int,
-                            data: Array[TopicData] ) extends Request(RequestKeys.Produce) {
+                            data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) {
 
   def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeout: Int, data: Array[TopicData]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeout, data)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Wed Jun  6 01:28:46 2012
@@ -18,16 +18,14 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import java.nio.channels.GatheringByteChannel
 import kafka.common.ErrorMapping
-import kafka.network.Send
 
-object ProducerResponse {
-  val CurrentVersion = 1.shortValue()
 
+object ProducerResponse {
   def readFrom(buffer: ByteBuffer): ProducerResponse = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
     val errorsSize = buffer.getInt
     val errors = new Array[Short](errorsSize)
     for( i <- 0 until errorsSize) {
@@ -38,28 +36,21 @@ object ProducerResponse {
     for( i <- 0 until offsetsSize) {
       offsets(i) = buffer.getLong
     }
-    new ProducerResponse(versionId, correlationId, errors, offsets)
+    new ProducerResponse(versionId, correlationId, errors, offsets, errorCode)
   }
-
-  def serializeResponse(producerResponse: ProducerResponse): ByteBuffer = {
-    val buffer = ByteBuffer.allocate(producerResponse.sizeInBytes)
-    producerResponse.writeTo(buffer)
-    buffer.rewind()
-    buffer
-  }
-
-  def deserializeResponse(buffer: ByteBuffer): ProducerResponse = readFrom(buffer)
-
 }
 
-case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], offsets: Array[Long]) {
-  val sizeInBytes = 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
+case class ProducerResponse(versionId: Short,  correlationId: Int, errors: Array[Short],
+                            offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+  val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
 
   def writeTo(buffer: ByteBuffer) {
-    /* version */
+    /* version id */
     buffer.putShort(versionId)
     /* correlation id */
     buffer.putInt(correlationId)
+    /* error code */
+    buffer.putShort(errorCode)
     /* errors */
     buffer.putInt(errors.length)
     errors.foreach(buffer.putShort(_))
@@ -67,35 +58,4 @@ case class ProducerResponse(versionId: S
     buffer.putInt(offsets.length)
     offsets.foreach(buffer.putLong(_))
   }
-}
-
-class ProducerResponseSend(val producerResponse: ProducerResponse,
-                           val error: Int = ErrorMapping.NoError) extends Send {
-  private val header = ByteBuffer.allocate(6)
-  header.putInt(producerResponse.sizeInBytes + 2)
-  header.putShort(error.toShort)
-  header.rewind()
-
-  val responseContent = ProducerResponse.serializeResponse(producerResponse)
-
-  var complete = false
-
-  def writeTo(channel: GatheringByteChannel):Int = {
-    expectIncomplete()
-    var written = 0
-    if(header.hasRemaining)
-      written += channel.write(header)
-
-    trace("Wrote %d bytes for header".format(written))
-
-    if(!header.hasRemaining && responseContent.hasRemaining)
-        written += channel.write(responseContent)
-
-    trace("Wrote %d bytes for header, errors and offsets".format(written))
-
-    if(!header.hasRemaining && !responseContent.hasRemaining)
-      complete = true
-
-    written
-  }
-}
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala Wed Jun  6 01:28:46 2012
@@ -22,4 +22,6 @@ object RequestKeys {
   val Fetch: Short = 1
   val Offsets: Short = 2
   val TopicMetadata: Short = 3
+  val LeaderAndISRRequest: Short = 4
+  val StopReplicaRequest: Short = 5
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala Wed Jun  6 01:28:46 2012
@@ -0,0 +1,28 @@
+package kafka.api
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio._
+
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
+
+  def sizeInBytes: Int
+  
+  def writeTo(buffer: ByteBuffer): Unit
+  
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Wed Jun  6 01:28:46 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+
+import java.nio._
+import kafka.utils._
+import collection.mutable.HashSet
+import collection.mutable.Set
+
+object StopReplicaRequest {
+  val CurrentVersion = 1.shortValue()
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
+    val versionId = buffer.getShort
+    val clientId = Utils.readShortString(buffer)
+    val ackTimeout = buffer.getInt
+    val topicPartitionPairCount = buffer.getInt
+    val topicPartitionPairSet = new HashSet[(String, Int)]()
+    for (i <- 0 until topicPartitionPairCount){
+      topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt))
+    }
+    new StopReplicaRequest(versionId, clientId, ackTimeout, topicPartitionPairSet)
+  }
+}
+
+case class StopReplicaRequest(versionId: Short,
+                              clientId: String,
+                              ackTimeout: Int,
+                              stopReplicaSet: Set[(String, Int)]
+                                     ) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
+  def this(ackTimeout: Int, stopReplicaSet: Set[(String, Int)]) = {
+    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeout, stopReplicaSet)
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    Utils.writeShortString(buffer, clientId)
+    buffer.putInt(ackTimeout)
+    buffer.putInt(stopReplicaSet.size)
+    for ((topic, partitionId) <- stopReplicaSet){
+      Utils.writeShortString(buffer, topic, "UTF-8")
+      buffer.putInt(partitionId)
+    }
+  }
+
+  def sizeInBytes(): Int = {
+    var size = 2 + (2 + clientId.length()) + 4 + 4
+    for ((topic, partitionId) <- stopReplicaSet){
+      size += (2 + topic.length()) + 4
+    }
+    size
+  }
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala Wed Jun  6 01:28:46 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.utils.Utils
+import collection.mutable.HashMap
+import collection.mutable.Map
+import kafka.common.ErrorMapping
+
+
+object StopReplicaResponse {
+  def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
+    val versionId = buffer.getShort
+    val errorCode = buffer.getShort
+    val numEntries = buffer.getInt
+
+    val responseMap = new HashMap[(String, Int), Short]()
+    for (i<- 0 until numEntries){
+      val topic = Utils.readShortString(buffer, "UTF-8")
+      val partition = buffer.getInt
+      val partitionErrorCode = buffer.getShort()
+      responseMap.put((topic, partition), partitionErrorCode)
+    }
+    new StopReplicaResponse(versionId, responseMap, errorCode)
+  }
+}
+
+
+case class StopReplicaResponse(val versionId: Short,
+                               val responseMap: Map[(String, Int), Short],
+                               val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+  def sizeInBytes: Int ={
+    var size = 2 + 2 + 4
+    for ((key, value) <- responseMap){
+      size += (2 + key._1.length) + 4 + 2
+    }
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putShort(errorCode)
+    buffer.putInt(responseMap.size)
+    for ((key:(String, Int), value) <- responseMap){
+      Utils.writeShortString(buffer, key._1, "UTF-8")
+      buffer.putInt(key._2)
+      buffer.putShort(value)
+    }
+  }
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala Wed Jun  6 01:28:46 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+
+
+object TopicMetaDataResponse {
+
+  def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = {
+    val errorCode = buffer.getShort
+    val versionId = buffer.getShort
+
+    val topicCount = buffer.getInt
+    val topicsMetadata = new Array[TopicMetadata](topicCount)
+    for( i <- 0 until topicCount) {
+      topicsMetadata(i) = TopicMetadata.readFrom(buffer)
+    }
+    new TopicMetaDataResponse(versionId, topicsMetadata.toSeq, errorCode)
+  }
+}
+
+case class TopicMetaDataResponse(versionId: Short,
+                                 topicsMetadata: Seq[TopicMetadata],
+                                 errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse
+{
+  val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + 2
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    /* error code */
+    buffer.putShort(errorCode)
+    /* topic metadata */
+    buffer.putInt(topicsMetadata.length)
+    topicsMetadata.foreach(_.writeTo(buffer))
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Wed Jun  6 01:28:46 2012
@@ -19,16 +19,16 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import kafka.utils.Utils._
-import kafka.network.{Send, Request}
-import java.nio.channels.GatheringByteChannel
-import kafka.common.ErrorMapping
 import collection.mutable.ListBuffer
+import kafka.utils._
 
 sealed trait DetailedMetadataRequest { def requestId: Short }
 case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
 case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] }
 
 object TopicMetadataRequest {
+  val CurrentVersion = 1.shortValue()
+  val DefaultClientId = ""
 
   /**
    * TopicMetadataRequest has the following format -
@@ -48,6 +48,8 @@ object TopicMetadataRequest {
   }
 
   def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
+    val versionId = buffer.getShort
+    val clientId = Utils.readShortString(buffer)
     val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
     val topics = new ListBuffer[String]()
     for(i <- 0 until numTopics)
@@ -66,37 +68,26 @@ object TopicMetadataRequest {
     }
     debug("topic = %s, detailed metadata request = %d"
           .format(topicsList.head, returnDetailedMetadata.requestId))
-    new TopicMetadataRequest(topics.toList, returnDetailedMetadata, timestamp, count)
-  }
-
-  def serializeTopicMetadata(topicMetadata: Seq[TopicMetadata]): ByteBuffer = {
-    val size = topicMetadata.foldLeft(4 /* num topics */)(_ + _.sizeInBytes)
-    val buffer = ByteBuffer.allocate(size)
-    debug("Allocating buffer of size %d for topic metadata response".format(size))
-    /* number of topics */
-    buffer.putInt(topicMetadata.size)
-    /* topic partition_metadata */
-    topicMetadata.foreach(m => m.writeTo(buffer))
-    buffer.rewind()
-    buffer
-  }
-
-  def deserializeTopicsMetadataResponse(buffer: ByteBuffer): Seq[TopicMetadata] = {
-    /* number of topics */
-    val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
-    val topicMetadata = new Array[TopicMetadata](numTopics)
-    for(i <- 0 until  numTopics)
-      topicMetadata(i) = TopicMetadata.readFrom(buffer)
-    topicMetadata
+    new TopicMetadataRequest(versionId, clientId, topics.toList, returnDetailedMetadata, timestamp, count)
   }
 }
 
-case class TopicMetadataRequest(val topics: Seq[String],
+case class TopicMetadataRequest(val versionId: Short,
+                                val clientId: String,
+                                val topics: Seq[String],
                                 val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
                                 val timestamp: Option[Long] = None, val count: Option[Int] = None)
-  extends Request(RequestKeys.TopicMetadata){
+ extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){
+
+def this(topics: Seq[String]) =
+  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)
+
+
+
 
   def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    Utils.writeShortString(buffer, clientId)
     buffer.putInt(topics.size)
     topics.foreach(topic => writeShortString(buffer, topic))
     buffer.putShort(detailedMetadata.requestId)
@@ -110,7 +101,7 @@ case class TopicMetadataRequest(val topi
   }
 
   def sizeInBytes(): Int = {
-    var size: Int = 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
+    var size: Int = 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
                     2 /* detailed metadata */
     detailedMetadata match {
       case SegmentMetadata =>
@@ -121,34 +112,3 @@ case class TopicMetadataRequest(val topi
     size
   }
 }
-
-class TopicMetadataSend(topicsMetadata: Seq[TopicMetadata]) extends Send {
-  private var size: Int = topicsMetadata.foldLeft(4)(_ + _.sizeInBytes)
-  private val header = ByteBuffer.allocate(6)
-  header.putInt(size + 2)
-  header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
-  header.rewind()
-
-  val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata)
-  metadata.rewind()
-
-  trace("Wrote size %d in header".format(size + 2))
-  var complete: Boolean = false
-
-  def writeTo(channel: GatheringByteChannel): Int = {
-    expectIncomplete()
-    var written = 0
-    if(header.hasRemaining)
-      written += channel.write(header)
-    trace("Wrote %d bytes for header".format(written))
-
-    if(!header.hasRemaining && metadata.hasRemaining)
-      written += channel.write(metadata)
-
-    trace("Wrote %d bytes for header and metadata".format(written))
-
-    if(!metadata.hasRemaining)
-      complete = true
-    written
-  }
-}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Wed Jun  6 01:28:46 2012
@@ -28,36 +28,36 @@ import scala.Predef._
 object ErrorMapping {
   val EmptyByteBuffer = ByteBuffer.allocate(0)
 
-  val UnknownCode = -1
-  val NoError = 0
-  val OffsetOutOfRangeCode = 1
-  val InvalidMessageCode = 2
-  val InvalidPartitionCode = 3
-  val InvalidFetchSizeCode = 4
-  val InvalidFetchRequestFormatCode = 5
-  val NotLeaderForPartitionCode = 6
-  val NoLeaderForPartitionCode = 7
-  val UnknownTopicCode = 8
+  val UnknownCode : Short = -1
+  val NoError : Short = 0
+  val OffsetOutOfRangeCode : Short = 1
+  val InvalidMessageCode : Short = 2
+  val InvalidPartitionCode : Short = 3
+  val InvalidFetchSizeCode  : Short = 4
+  val InvalidFetchRequestFormatCode : Short = 5
+  val NoLeaderForPartitionCode : Short = 6
+  val NotLeaderForPartitionCode : Short = 7
+  val UnknownTopicCode : Short = 8
 
   private val exceptionToCode = 
-    Map[Class[Throwable], Int](
+    Map[Class[Throwable], Short](
       classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
       classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> InvalidPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
-      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode
-//      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
+      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
+      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */
   private val codeToException = 
-    (Map[Int, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
+    (Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
   
-  def codeFor(exception: Class[Throwable]): Int = exceptionToCode(exception)
+  def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception)
   
-  def maybeThrowException(code: Int) =
+  def maybeThrowException(code: Short) =
     if(code != 0)
       throw codeToException(code).newInstance()
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Wed Jun  6 01:28:46 2012
@@ -57,10 +57,10 @@ class SimpleConsumer( val host: String,
     }
   }
   
-  private def sendRequest(request: Request): Tuple2[Receive, Int] = {
+  private def sendRequest(request: RequestOrResponse): Receive = {
     lock synchronized {
       getOrMakeConnection()
-      var response: Tuple2[Receive,Int] = null
+      var response: Receive = null
       try {
         blockingChannel.send(request)
         response = blockingChannel.receive()
@@ -92,7 +92,7 @@ class SimpleConsumer( val host: String,
   def fetch(request: FetchRequest): FetchResponse = {
     val startTime = SystemTime.nanoseconds
     val response = sendRequest(request)
-    val fetchResponse = FetchResponse.readFrom(response._1.buffer)
+    val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
 
     val endTime = SystemTime.nanoseconds
@@ -112,7 +112,7 @@ class SimpleConsumer( val host: String,
   def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
     val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
     val response = sendRequest(request)
-    OffsetRequest.deserializeOffsetArray(response._1.buffer)
+    OffsetResponse.readFrom(response.buffer).offsets
   }
 
   private def getOrMakeConnection() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Wed Jun  6 01:28:46 2012
@@ -16,7 +16,7 @@
  */
 package kafka.javaapi
 
-import kafka.network.Request
+import kafka.api.RequestOrResponse
 import kafka.api.{RequestKeys, TopicData}
 import java.nio.ByteBuffer
 
@@ -24,7 +24,7 @@ class ProducerRequest(val correlationId:
                       val clientId: String,
                       val requiredAcks: Short,
                       val ackTimeout: Int,
-                      val data: Array[TopicData]) extends Request(RequestKeys.Produce) {
+                      val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) {
 	
   val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Wed Jun  6 01:28:46 2012
@@ -22,7 +22,7 @@ import kafka.message._
 
 class ByteBufferMessageSet(private val buffer: ByteBuffer,
                            private val initialOffset: Long = 0L,
-                           private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
+                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet {
   val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
                                                                                               initialOffset,
                                                                                               errorCode)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Wed Jun  6 01:28:46 2012
@@ -35,7 +35,7 @@ import kafka.common.{MessageSizeTooLarge
  */
 class ByteBufferMessageSet(private val buffer: ByteBuffer,
                            private val initialOffset: Long = 0L,
-                           private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
+                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet with Logging {
   private var shallowValidByteCount = -1L
   if(sizeInBytes > Int.MaxValue)
     throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Wed Jun  6 01:28:46 2012
@@ -20,6 +20,7 @@ package kafka.network
 import java.net.InetSocketAddress
 import java.nio.channels._
 import kafka.utils.{nonthreadsafe, Logging}
+import kafka.api.RequestOrResponse
 
 /**
  *  A simple blocking channel with timeouts correctly enabled.
@@ -70,7 +71,7 @@ class BlockingChannel( val host: String,
 
   def isConnected = connected
   
-  def send(request: Request):Int = {
+  def send(request: RequestOrResponse):Int = {
     if(!connected)
       throw new ClosedChannelException()
 
@@ -78,16 +79,14 @@ class BlockingChannel( val host: String,
     send.writeCompletely(writeChannel)
   }
   
-  def receive(): Tuple2[Receive, Int] = {
+  def receive(): Receive = {
     if(!connected)
       throw new ClosedChannelException()
 
     val response = new BoundedByteBufferReceive()
     response.readCompletely(readChannel)
 
-    // this has the side effect of setting the initial position of buffer correctly
-    val errorCode: Int = response.buffer.getShort
-    (response, errorCode)
+    response
   }
 
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala Wed Jun  6 01:28:46 2012
@@ -20,6 +20,7 @@ package kafka.network
 import java.nio._
 import java.nio.channels._
 import kafka.utils._
+import kafka.api.RequestOrResponse
 
 @nonthreadsafe
 private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send {
@@ -37,12 +38,18 @@ private[kafka] class BoundedByteBufferSe
 
   def this(size: Int) = this(ByteBuffer.allocate(size))
   
-  def this(request: Request) = {
-    this(request.sizeInBytes + 2)
-    buffer.putShort(request.id)
+  def this(request: RequestOrResponse) = {
+    this(request.sizeInBytes + (if(request.requestId != None) 2 else 0))
+    request.requestId match {
+      case Some(requestId) =>
+        buffer.putShort(requestId)
+      case None =>
+    }
+
     request.writeTo(buffer)
     buffer.rewind()
   }
+
   
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Wed Jun  6 01:28:46 2012
@@ -19,7 +19,7 @@ package kafka.producer
 
 import kafka.api._
 import kafka.message.MessageSet
-import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive}
+import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
 import java.util.Random
 import kafka.common.MessageSizeTooLargeException
@@ -46,7 +46,7 @@ class SyncProducer(val config: SyncProdu
 
   trace("Instantiating Scala Sync Producer")
 
-  private def verifyRequest(request: Request) = {
+  private def verifyRequest(request: RequestOrResponse) = {
     /**
      * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
      * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
@@ -66,13 +66,13 @@ class SyncProducer(val config: SyncProdu
   /**
    * Common functionality for the public send methods
    */
-  private def doSend(request: Request): Tuple2[Receive, Int] = {
+  private def doSend(request: RequestOrResponse): Receive = {
     lock synchronized {
       verifyRequest(request)
       val startTime = SystemTime.nanoseconds
       getOrMakeConnection()
 
-      var response: Tuple2[Receive, Int] = null
+      var response: Receive = null
       try {
         blockingChannel.send(request)
         response = blockingChannel.receive()
@@ -108,12 +108,13 @@ class SyncProducer(val config: SyncProdu
       }
     }
     val response = doSend(producerRequest)
-    ProducerResponse.deserializeResponse(response._1.buffer)
+    ProducerResponse.readFrom(response.buffer)
   }
 
   def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
     val response = doSend(request)
-    TopicMetadataRequest.deserializeTopicsMetadataResponse(response._1.buffer)
+    val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer)
+    topicMetaDataResponse.topicsMetadata
   }
 
   def close() = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Jun  6 01:28:46 2012
@@ -28,8 +28,11 @@ import kafka.network._
 import kafka.utils.{SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
+import mutable.HashMap
 import scala.math._
 import java.lang.IllegalStateException
+import kafka.network.RequestChannel.Response
+
 
 /**
  * Logic to handle the various Kafka requests
@@ -50,10 +53,40 @@ class KafkaApis(val requestChannel: Requ
       case RequestKeys.Fetch => handleFetchRequest(request)
       case RequestKeys.Offsets => handleOffsetRequest(request)
       case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
+      case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request)
+      case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request)
       case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
     }
   }
 
+
+  def handleLeaderAndISRRequest(request: RequestChannel.Request){
+    val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
+    val responseMap = new HashMap[(String, Int), Short]
+
+    // TODO: put in actual logic later
+    for((key, value) <- leaderAndISRRequest.leaderAndISRInfos){
+      responseMap.put(key, ErrorMapping.NoError)
+    }
+
+    val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse), -1))
+  }
+
+
+  def handleStopReplicaRequest(request: RequestChannel.Request){
+    val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
+    val responseMap = new HashMap[(String, Int), Short]
+
+    // TODO: put in actual logic later
+    for((topic, partition) <- stopReplicaRequest.stopReplicaSet){
+      responseMap.put((topic, partition), ErrorMapping.NoError)
+    }
+    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse), -1))
+  }
+
+
   /**
    * Handle a produce request
    */
@@ -65,7 +98,7 @@ class KafkaApis(val requestChannel: Requ
 
     val response = produce(produceRequest)
     debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
-    requestChannel.sendResponse(new RequestChannel.Response(request, new ProducerResponseSend(response), -1))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
     
     // Now check any outstanding fetches this produce just unblocked
     var satisfied = new mutable.ArrayBuffer[DelayedFetch]
@@ -77,7 +110,7 @@ class KafkaApis(val requestChannel: Requ
     for(fetchReq <- satisfied) {
        val topicData = readMessageSets(fetchReq.fetch)
        val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
-       requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+      requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response), -1))
     }
   }
 
@@ -115,7 +148,7 @@ class KafkaApis(val requestChannel: Requ
         }
       }
     }
-    new ProducerResponse(ProducerResponse.CurrentVersion, request.correlationId, errors, offsets)
+    new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
   }
 
   /**
@@ -131,9 +164,8 @@ class KafkaApis(val requestChannel: Requ
       fetchRequest.validate()
     } catch {
       case e:FetchRequestFormatException =>
-        val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty)
-        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response,
-          ErrorMapping.InvalidFetchRequestFormatCode), -1)
+        val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty)
+        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response), -1)
         requestChannel.sendResponse(channelResponse)
     }
 
@@ -147,7 +179,7 @@ class KafkaApis(val requestChannel: Requ
       debug("Returning fetch response %s for fetch request with correlation id %d"
         .format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response), -1))
     } else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val keys: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
@@ -240,8 +272,8 @@ class KafkaApis(val requestChannel: Requ
   /**
    * Read from a single topic/partition at the given offset
    */
-  private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
-    var response: Either[Int, MessageSet] = null
+  private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = {
+    var response: Either[Short, MessageSet] = null
     try {
       // check if the current broker is the leader for the partitions
       kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
@@ -264,8 +296,8 @@ class KafkaApis(val requestChannel: Requ
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Offset request " + offsetRequest.toString)
     val offsets = logManager.getOffsets(offsetRequest)
-    val response = new OffsetArraySend(offsets)
-    requestChannel.sendResponse(new RequestChannel.Response(request, response, -1))
+    val response = new OffsetResponse(offsetRequest.versionId, offsets)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
   }
 
   /**
@@ -303,7 +335,8 @@ class KafkaApis(val requestChannel: Requ
       }
     }
     info("Sending response for topic metadata request")
-    requestChannel.sendResponse(new RequestChannel.Response(request, new TopicMetadataSend(topicsMetadata), -1))
+    val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
   }
 
   def close() {
@@ -337,7 +370,7 @@ class KafkaApis(val requestChannel: Requ
     def expire(delayed: DelayedFetch) {
       val topicData = readMessageSets(delayed.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
-      requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+      requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response), -1))
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala Wed Jun  6 01:28:46 2012
@@ -29,13 +29,13 @@ import kafka.common.ErrorMapping
  * wholly in kernel space
  */
 @nonthreadsafe
-private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Int) extends Send {
+private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send {
   
   private var sent: Long = 0
   private var size: Long = messages.sizeInBytes
   private val header = ByteBuffer.allocate(6)
   header.putInt(size.asInstanceOf[Int] + 2)
-  header.putShort(errorCode.asInstanceOf[Short])
+  header.putShort(errorCode)
   header.rewind()
   
   var complete: Boolean = false

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala Wed Jun  6 01:28:46 2012
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.controller
+
+
+import org.scalatest.junit.JUnit3Suite
+
+import junit.framework.Assert._
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+import kafka.api._
+import collection.mutable.Map
+import collection.mutable.Set
+import kafka.integration.KafkaServerTestHarness
+import kafka.utils.TestUtils
+import kafka.server.KafkaConfig
+import kafka.network.{Receive, BlockingChannel}
+
+
+class ControllerToBrokerRequestTest extends JUnit3Suite with KafkaServerTestHarness  {
+
+  val kafkaProps = TestUtils.createBrokerConfigs(1)
+  val configs = List(new KafkaConfig(kafkaProps.head))
+  var blockingChannel: BlockingChannel = null
+
+  override def setUp() {
+    super.setUp()
+    blockingChannel = new BlockingChannel("localhost", configs.head.port, 1000000, 0, 64*1024)
+    blockingChannel.connect
+  }
+
+  override def tearDown() {
+    super.tearDown()
+    blockingChannel.disconnect()
+  }
+
+
+  def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
+    val topic1 = "test1"
+    val topic2 = "test2"
+
+    val leader1 = 1;
+    val ISR1 = List(1, 2, 3)
+
+    val leader2 = 2;
+    val ISR2 = List(2, 3, 4)
+
+    val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
+    val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
+    val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
+                  ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
+
+    new LeaderAndISRRequest(1, "client 1", 1, 4, map)
+  }
+
+  def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
+    val topic1 = "test1"
+    val topic2 = "test2"
+    val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+                          ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+
+    new LeaderAndISRResponse(1, responseMap)
+  }
+
+
+  def createSampleStopReplicaRequest() : StopReplicaRequest = {
+    val topic1 = "test1"
+    val topic2 = "test2"
+    new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
+                                           (topic2, 1), (topic2, 2)))
+  }
+
+  def createSampleStopReplicaResponse() : StopReplicaResponse = {
+    val topic1 = "test1"
+    val topic2 = "test2"
+    val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+                          ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+
+    new StopReplicaResponse(1, responseMap)
+  }
+
+
+  def testLeaderAndISRRequest {
+    val leaderAndISRRequest = createSampleLeaderAndISRRequest()
+
+    val serializedLeaderAndISRRequest = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes)
+    leaderAndISRRequest.writeTo(serializedLeaderAndISRRequest)
+    serializedLeaderAndISRRequest.rewind()
+    val deserializedLeaderAndISRRequest = LeaderAndISRRequest.readFrom(serializedLeaderAndISRRequest)
+
+    assertEquals(leaderAndISRRequest, deserializedLeaderAndISRRequest)
+  }
+
+  def testLeaderAndISRResponse {
+    val leaderAndISRResponse = createSampleLeaderAndISRResponse()
+
+    val serializedLeaderAndISRResponse = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes)
+    leaderAndISRResponse.writeTo(serializedLeaderAndISRResponse)
+    serializedLeaderAndISRResponse.rewind()
+    val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(serializedLeaderAndISRResponse)
+    assertEquals(leaderAndISRResponse, deserializedLeaderAndISRResponse)
+  }
+
+
+  def testStopReplicaRequest {
+    val stopReplicaRequest = createSampleStopReplicaRequest()
+
+    val serializedStopReplicaRequest = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes)
+    stopReplicaRequest.writeTo(serializedStopReplicaRequest)
+    serializedStopReplicaRequest.rewind()
+    val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(serializedStopReplicaRequest)
+    assertEquals(stopReplicaRequest, deserializedStopReplicaRequest)
+  }
+
+
+  def testStopReplicaResponse {
+    val stopReplicaResponse = createSampleStopReplicaResponse()
+
+    val serializedStopReplicaResponse = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes)
+    stopReplicaResponse.writeTo(serializedStopReplicaResponse)
+    serializedStopReplicaResponse.rewind()
+    val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(serializedStopReplicaResponse)
+    assertEquals(stopReplicaResponse, deserializedStopReplicaResponse)
+  }
+
+
+
+  def testEndToEndLeaderAndISRRequest {
+
+    val leaderAndISRRequest = createSampleLeaderAndISRRequest()
+
+    var response: Receive = null
+    blockingChannel.send(leaderAndISRRequest)
+    response = blockingChannel.receive()
+
+    val leaderAndISRResponse = LeaderAndISRResponse.readFrom(response.buffer)
+    val expectedLeaderAndISRResponse = createSampleLeaderAndISRResponse()
+
+    assertEquals(leaderAndISRResponse, expectedLeaderAndISRResponse)
+
+  }
+
+
+
+  def testEndToEndStopReplicaRequest {
+    val stopReplicaRequest = createSampleStopReplicaRequest()
+
+    var response: Receive = null
+    blockingChannel.send(stopReplicaRequest)
+    response = blockingChannel.receive()
+
+    val stopReplicaResponse = StopReplicaResponse.readFrom(response.buffer)
+    val expectedStopReplicaResponse = createSampleStopReplicaResponse()
+    assertEquals(stopReplicaResponse, expectedStopReplicaResponse)
+
+  }
+
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Wed Jun  6 01:28:46 2012
@@ -25,12 +25,13 @@ import kafka.log.LogManager
 import junit.framework.Assert._
 import org.easymock.EasyMock
 import kafka.network._
-import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
+import kafka.api.{TopicMetaDataResponse, TopicMetadataRequest}
 import kafka.cluster.Broker
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
 
+
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
@@ -104,10 +105,10 @@ class TopicMetadataTest extends JUnit3Su
 
     // call the API (to be tested) to get metadata
     apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
-    val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[TopicMetadataSend].metadata
+    val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer
     
     // check assertions
-    val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(metadataResponse)
+    val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
     val partitionMetadata = topicMetadata.head.partitionsMetadata

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Wed Jun  6 01:28:46 2012
@@ -121,7 +121,7 @@ class ByteBufferMessageSetTest extends B
       buffer.put(emptyMessageSet.getSerialized())
       buffer.put(regularMessgeSet.getSerialized())
       buffer.rewind
-      val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
+      val mixedMessageSet = new ByteBufferMessageSet(buffer)
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
@@ -142,7 +142,7 @@ class ByteBufferMessageSetTest extends B
       buffer.put(emptyMessageSet.getSerialized())
       buffer.put(regularMessgeSet.getSerialized())
       buffer.rewind
-      val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
+      val mixedMessageSet = new ByteBufferMessageSet(buffer)
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Wed Jun  6 01:28:46 2012
@@ -382,9 +382,9 @@ class AsyncProducerTest extends JUnit3Su
     mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
     EasyMock.expectLastCall().andReturn(List(topic1Metadata))
     mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5))))
-    EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)))
+    EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
     mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5))))
-    EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)))
+    EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
 	  EasyMock.replay(mockSyncProducer)
 
     val producerPool = EasyMock.createMock(classOf[ProducerPool])
@@ -442,9 +442,9 @@ class AsyncProducerTest extends JUnit3Su
     // On the third try for partition 0, let it succeed.
     val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
     val response1 = 
-      new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) 
+      new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
     val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
-    val response2 = new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L))
+    val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
     EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Wed Jun  6 01:28:46 2012
@@ -31,7 +31,6 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.cluster.Broker
 import collection.mutable.ListBuffer
 import kafka.consumer.ConsumerConfig
-import scala.collection.Map
 import kafka.api.{TopicData, PartitionData}
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit