You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/06 03:28:48 UTC
svn commit: r1346697 - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/common/
core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/
core/src/m...
Author: junrao
Date: Wed Jun 6 01:28:46 2012
New Revision: 1346697
URL: http://svn.apache.org/viewvc?rev=1346697&view=rev
Log:
Add admin RPC requests; clean up Response objects; patched by Yang Ye; reviewed by Jun Rao; KAFKA-349; KAFKA-336
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Request.scala
Modified:
incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Wed Jun 6 01:28:46 2012
@@ -255,7 +255,7 @@ public class KafkaETLContext {
*/
protected boolean hasError(ByteBufferMessageSet messages)
throws IOException {
- int errorCode = messages.getErrorCode();
+ short errorCode = messages.getErrorCode();
if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
/* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
Kafka server may delete old files from time to time */
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Wed Jun 6 01:28:46 2012
@@ -18,7 +18,6 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.network.Request
import kafka.utils.Utils
import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
import kafka.common.FetchRequestFormatException
@@ -105,7 +104,7 @@ case class FetchRequest(versionId: Short
replicaId: Int = FetchRequest.DefaultReplicaId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
- offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
+ offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) {
// ensure that a topic "X" appears in at most one OffsetDetail
def validate() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Wed Jun 6 01:28:46 2012
@@ -26,8 +26,8 @@ import kafka.utils.Utils
object PartitionData {
def readFrom(buffer: ByteBuffer): PartitionData = {
+ val error = buffer.getShort
val partition = buffer.getInt
- val error = buffer.getInt
val initialOffset = buffer.getLong
val hw = buffer.getLong()
val messageSetSize = buffer.getInt
@@ -38,21 +38,48 @@ object PartitionData {
}
}
-case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L,
- messages: MessageSet) {
- val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + 8
+case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
+ val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8
def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
+}
+
+// SENDS
+
+class PartitionDataSend(val partitionData: PartitionData) extends Send {
+ private val messageSize = partitionData.messages.sizeInBytes
+ private var messagesSentSize = 0L
+
+ private val buffer = ByteBuffer.allocate(26)
+ buffer.putShort(partitionData.error)
+ buffer.putInt(partitionData.partition)
+ buffer.putLong(partitionData.initialOffset)
+ buffer.putLong(partitionData.hw)
+ buffer.putInt(partitionData.messages.sizeInBytes.intValue())
+ buffer.rewind()
+
+ def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
+ def writeTo(channel: GatheringByteChannel): Int = {
+ var written = 0
+ if(buffer.hasRemaining)
+ written += channel.write(buffer)
+ if(!buffer.hasRemaining && messagesSentSize < messageSize) {
+ val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
+ messagesSentSize += bytesSent
+ written += bytesSent
+ }
+ written
+ }
}
-object TopicData {
+object TopicData {
def readFrom(buffer: ByteBuffer): TopicData = {
val topic = Utils.readShortString(buffer, "UTF-8")
val partitionCount = buffer.getInt
val partitions = new Array[PartitionData](partitionCount)
- for(i <- 0 until partitions.length)
+ for(i <- 0 until partitionCount)
partitions(i) = PartitionData.readFrom(buffer)
new TopicData(topic, partitions.sortBy(_.partition))
}
@@ -90,26 +117,61 @@ case class TopicData(topic: String, part
}
}
-object FetchResponse {
- val CurrentVersion = 1.shortValue()
+class TopicDataSend(val topicData: TopicData) extends Send {
+ val size = topicData.sizeInBytes
+ var sent = 0
+
+ private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
+ Utils.writeShortString(buffer, topicData.topic, "UTF-8")
+ buffer.putInt(topicData.partitionData.length)
+ buffer.rewind()
+
+ val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) {
+ val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes)
+ }
+
+ def complete = sent >= size
+
+ def writeTo(channel: GatheringByteChannel): Int = {
+ expectIncomplete()
+ var written = 0
+ if(buffer.hasRemaining)
+ written += channel.write(buffer)
+ if(!buffer.hasRemaining && !sends.complete) {
+ written += sends.writeCompletely(channel)
+ }
+ sent += written
+ written
+ }
+}
+
+
+
+
+object FetchResponse {
def readFrom(buffer: ByteBuffer): FetchResponse = {
val versionId = buffer.getShort
+ val errorCode = buffer.getShort
val correlationId = buffer.getInt
val dataCount = buffer.getInt
val data = new Array[TopicData](dataCount)
for(i <- 0 until data.length)
data(i) = TopicData.readFrom(buffer)
- new FetchResponse(versionId, correlationId, data)
+ new FetchResponse(versionId, correlationId, data, errorCode)
}
}
-case class FetchResponse(versionId: Short, correlationId: Int, data: Array[TopicData]) {
- val sizeInBytes = 2 + 4 + data.foldLeft(4)(_ + _.sizeInBytes)
+case class FetchResponse(versionId: Short,
+ correlationId: Int,
+ data: Array[TopicData],
+ errorCode: Short = ErrorMapping.NoError) {
+
+ val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes)
lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
-
+
def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
val messageSet = topicMap.get(topic) match {
case Some(topicData) =>
@@ -129,75 +191,15 @@ case class FetchResponse(versionId: Shor
}
}
-// SENDS
-
-class PartitionDataSend(val partitionData: PartitionData) extends Send {
- private val messageSize = partitionData.messages.sizeInBytes
- private var messagesSentSize = 0L
-
- private val buffer = ByteBuffer.allocate(28)
- buffer.putInt(partitionData.partition)
- buffer.putInt(partitionData.error)
- buffer.putLong(partitionData.initialOffset)
- buffer.putLong(partitionData.hw)
- buffer.putInt(partitionData.messages.sizeInBytes.intValue())
- buffer.rewind()
-
- def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
-
- def writeTo(channel: GatheringByteChannel): Int = {
- var written = 0
- if(buffer.hasRemaining)
- written += channel.write(buffer)
- if(!buffer.hasRemaining && messagesSentSize < messageSize) {
- val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
- messagesSentSize += bytesSent
- written += bytesSent
- }
- written
- }
-}
-
-class TopicDataSend(val topicData: TopicData) extends Send {
- val size = topicData.sizeInBytes
-
- var sent = 0
-
- private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
- Utils.writeShortString(buffer, topicData.topic, "UTF-8")
- buffer.putInt(topicData.partitionData.length)
- buffer.rewind()
-
- val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) {
- val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes)
- }
-
- def complete = sent >= size
-
- def writeTo(channel: GatheringByteChannel): Int = {
- expectIncomplete()
- var written = 0
- if(buffer.hasRemaining)
- written += channel.write(buffer)
- if(!buffer.hasRemaining && !sends.complete) {
- written += sends.writeCompletely(channel)
- }
- sent += written
- written
- }
-}
-
-class FetchResponseSend(val fetchResponse: FetchResponse,
- val errorCode: Int = ErrorMapping.NoError) extends Send {
+class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
private val size = fetchResponse.sizeInBytes
-
private var sent = 0
private val buffer = ByteBuffer.allocate(16)
- buffer.putInt(size + 2)
- buffer.putShort(errorCode.shortValue())
+ buffer.putInt(size)
buffer.putShort(fetchResponse.versionId)
+ buffer.putShort(fetchResponse.errorCode)
buffer.putInt(fetchResponse.correlationId)
buffer.putInt(fetchResponse.data.length)
buffer.rewind()
@@ -220,6 +222,5 @@ class FetchResponseSend(val fetchRespons
written
}
- def sendSize = 4 + 2 + fetchResponse.sizeInBytes
-
+ def sendSize = 4 + fetchResponse.sizeInBytes
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala Wed Jun 6 01:28:46 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package kafka.api
+
+import java.nio._
+import kafka.utils._
+import collection.mutable.Map
+import collection.mutable.HashMap
+
+
+object LeaderAndISR {
+ def readFrom(buffer: ByteBuffer): LeaderAndISR = {
+ val leader = buffer.getInt
+ val leaderGenId = buffer.getInt
+ val ISRString = Utils.readShortString(buffer, "UTF-8")
+ val ISR = ISRString.split(",").map(_.toInt).toList
+ val zkVersion = buffer.getLong
+ new LeaderAndISR(leader, leaderGenId, ISR, zkVersion)
+ }
+}
+
+case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Long){
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(leader)
+ buffer.putInt(leaderEpoc)
+ Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8")
+ buffer.putLong(zkVersion)
+ }
+
+ def sizeInBytes(): Int = {
+ val size = 4 + 4 + (2 + ISR.mkString(",").length) + 8
+ size
+ }
+}
+
+
+object LeaderAndISRRequest {
+ val CurrentVersion = 1.shortValue()
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = {
+ val versionId = buffer.getShort
+ val clientId = Utils.readShortString(buffer)
+ val isInit = buffer.get()
+ val ackTimeout = buffer.getInt
+ val leaderAndISRRequestCount = buffer.getInt
+ val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
+
+ for(i <- 0 until leaderAndISRRequestCount){
+ val topic = Utils.readShortString(buffer, "UTF-8")
+ val partition = buffer.getInt
+ val leaderAndISRRequest = LeaderAndISR.readFrom(buffer)
+
+ leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
+ }
+ new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeout, leaderAndISRInfos)
+ }
+}
+
+
+case class LeaderAndISRRequest (versionId: Short,
+ clientId: String,
+ isInit: Byte,
+ ackTimeout: Int,
+ leaderAndISRInfos:
+ Map[(String, Int), LeaderAndISR])
+ extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
+ def this(isInit: Byte, ackTimeout: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
+ this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeout, leaderAndISRInfos)
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ Utils.writeShortString(buffer, clientId)
+ buffer.put(isInit)
+ buffer.putInt(ackTimeout)
+ buffer.putInt(leaderAndISRInfos.size)
+ for((key, value) <- leaderAndISRInfos){
+ Utils.writeShortString(buffer, key._1, "UTF-8")
+ buffer.putInt(key._2)
+ value.writeTo(buffer)
+ }
+ }
+
+ def sizeInBytes(): Int = {
+ var size = 1 + 2 + (2 + clientId.length) + 4 + 4
+ for((key, value) <- leaderAndISRInfos)
+ size += (2 + key._1.length) + 4 + value.sizeInBytes
+ size
+ }
+}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala Wed Jun 6 01:28:46 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import kafka.common.ErrorMapping
+import java.nio.ByteBuffer
+import kafka.utils.Utils
+import collection.mutable.HashMap
+import collection.mutable.Map
+
+
+object LeaderAndISRResponse {
+ def readFrom(buffer: ByteBuffer): LeaderAndISRResponse = {
+ val versionId = buffer.getShort
+ val errorCode = buffer.getShort
+ val numEntries = buffer.getInt
+ val responseMap = new HashMap[(String, Int), Short]()
+ for (i<- 0 until numEntries){
+ val topic = Utils.readShortString(buffer, "UTF-8")
+ val partition = buffer.getInt
+ val partitionErrorCode = buffer.getShort
+ responseMap.put((topic, partition), partitionErrorCode)
+ }
+ new LeaderAndISRResponse(versionId, responseMap, errorCode)
+ }
+}
+
+
+case class LeaderAndISRResponse(versionId: Short,
+ responseMap: Map[(String, Int), Short],
+ errorCode: Short = ErrorMapping.NoError)
+ extends RequestOrResponse{
+ def sizeInBytes(): Int ={
+ var size = 2 + 2 + 4
+ for ((key, value) <- responseMap){
+ size += 2 + key._1.length + 4 + 2
+ }
+ size
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ buffer.putShort(errorCode)
+ buffer.putInt(responseMap.size)
+ for ((key:(String, Int), value) <- responseMap){
+ Utils.writeShortString(buffer, key._1, "UTF-8")
+ buffer.putInt(key._2)
+ buffer.putShort(value)
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Wed Jun 6 01:28:46 2012
@@ -18,83 +18,50 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.utils.{nonthreadsafe, Utils}
-import kafka.network.{Send, Request}
-import java.nio.channels.GatheringByteChannel
-import kafka.common.ErrorMapping
+import kafka.utils.Utils
object OffsetRequest {
+ val CurrentVersion = 1.shortValue()
+ val DefaultClientId = ""
+
val SmallestTimeString = "smallest"
val LargestTimeString = "largest"
val LatestTime = -1L
val EarliestTime = -2L
def readFrom(buffer: ByteBuffer): OffsetRequest = {
+ val versionId = buffer.getShort
+ val clientId = Utils.readShortString(buffer)
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt()
val offset = buffer.getLong
val maxNumOffsets = buffer.getInt
- new OffsetRequest(topic, partition, offset, maxNumOffsets)
+ new OffsetRequest(versionId, clientId, topic, partition, offset, maxNumOffsets)
}
+}
- def serializeOffsetArray(offsets: Array[Long]): ByteBuffer = {
- val size = 4 + 8 * offsets.length
- val buffer = ByteBuffer.allocate(size)
- buffer.putInt(offsets.length)
- for (i <- 0 until offsets.length)
- buffer.putLong(offsets(i))
- buffer.rewind
- buffer
- }
+case class OffsetRequest(versionId: Short = OffsetRequest.CurrentVersion,
+ clientId: String = OffsetRequest.DefaultClientId,
+ topic: String,
+ partition: Int,
+ time: Long,
+ maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) {
+ def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) =
+ this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets)
- def deserializeOffsetArray(buffer: ByteBuffer): Array[Long] = {
- val size = buffer.getInt
- val offsets = new Array[Long](size)
- for (i <- 0 until offsets.length)
- offsets(i) = buffer.getLong
- offsets
- }
-}
-class OffsetRequest(val topic: String,
- val partition: Int,
- val time: Long,
- val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ Utils.writeShortString(buffer, clientId)
Utils.writeShortString(buffer, topic)
buffer.putInt(partition)
buffer.putLong(time)
buffer.putInt(maxNumOffsets)
}
- def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
-
- override def toString(): String= "OffsetRequest(topic:" + topic + ", part:" + partition + ", time:" + time +
- ", maxNumOffsets:" + maxNumOffsets + ")"
-}
+ def sizeInBytes(): Int = 2 + (2 + clientId.length()) + (2 + topic.length) + 4 + 8 + 4
-@nonthreadsafe
-private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send {
- private var size: Long = offsets.foldLeft(4)((sum, _) => sum + 8)
- private val header = ByteBuffer.allocate(6)
- header.putInt(size.asInstanceOf[Int] + 2)
- header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
- header.rewind()
- private val contentBuffer = OffsetRequest.serializeOffsetArray(offsets)
-
- var complete: Boolean = false
-
- def writeTo(channel: GatheringByteChannel): Int = {
- expectIncomplete()
- var written = 0
- if(header.hasRemaining)
- written += channel.write(header)
- if(!header.hasRemaining && contentBuffer.hasRemaining)
- written += channel.write(contentBuffer)
-
- if(!contentBuffer.hasRemaining)
- complete = true
- written
- }
+ override def toString(): String= "OffsetRequest(version:" + versionId + ", client id:" + clientId +
+ ", topic:" + topic + ", part:" + partition + ", time:" + time + ", maxNumOffsets:" + maxNumOffsets + ")"
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala Wed Jun 6 01:28:46 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+
+
+object OffsetResponse {
+ def readFrom(buffer: ByteBuffer): OffsetResponse = {
+ val versionId = buffer.getShort
+ val errorCode = buffer.getShort
+ val offsetsSize = buffer.getInt
+ val offsets = new Array[Long](offsetsSize)
+ for( i <- 0 until offsetsSize) {
+ offsets(i) = buffer.getLong
+ }
+ new OffsetResponse(versionId, offsets, errorCode)
+ }
+}
+
+case class OffsetResponse(versionId: Short,
+ offsets: Array[Long],
+ errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+ val sizeInBytes = 2 + 2 + offsets.foldLeft(4)((sum, _) => sum + 8)
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ /* error code */
+ buffer.putShort(errorCode)
+ buffer.putInt(offsets.length)
+ offsets.foreach(buffer.putLong(_))
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Wed Jun 6 01:28:46 2012
@@ -19,7 +19,6 @@ package kafka.api
import java.nio._
import kafka.message._
-import kafka.network._
import kafka.utils._
object ProducerRequest {
@@ -58,7 +57,7 @@ case class ProducerRequest( versionId: S
clientId: String,
requiredAcks: Short,
ackTimeout: Int,
- data: Array[TopicData] ) extends Request(RequestKeys.Produce) {
+ data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) {
def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeout: Int, data: Array[TopicData]) =
this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeout, data)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Wed Jun 6 01:28:46 2012
@@ -18,16 +18,14 @@
package kafka.api
import java.nio.ByteBuffer
-import java.nio.channels.GatheringByteChannel
import kafka.common.ErrorMapping
-import kafka.network.Send
-object ProducerResponse {
- val CurrentVersion = 1.shortValue()
+object ProducerResponse {
def readFrom(buffer: ByteBuffer): ProducerResponse = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
+ val errorCode = buffer.getShort
val errorsSize = buffer.getInt
val errors = new Array[Short](errorsSize)
for( i <- 0 until errorsSize) {
@@ -38,28 +36,21 @@ object ProducerResponse {
for( i <- 0 until offsetsSize) {
offsets(i) = buffer.getLong
}
- new ProducerResponse(versionId, correlationId, errors, offsets)
+ new ProducerResponse(versionId, correlationId, errors, offsets, errorCode)
}
-
- def serializeResponse(producerResponse: ProducerResponse): ByteBuffer = {
- val buffer = ByteBuffer.allocate(producerResponse.sizeInBytes)
- producerResponse.writeTo(buffer)
- buffer.rewind()
- buffer
- }
-
- def deserializeResponse(buffer: ByteBuffer): ProducerResponse = readFrom(buffer)
-
}
-case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], offsets: Array[Long]) {
- val sizeInBytes = 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
+case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short],
+ offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+ val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
def writeTo(buffer: ByteBuffer) {
- /* version */
+ /* version id */
buffer.putShort(versionId)
/* correlation id */
buffer.putInt(correlationId)
+ /* error code */
+ buffer.putShort(errorCode)
/* errors */
buffer.putInt(errors.length)
errors.foreach(buffer.putShort(_))
@@ -67,35 +58,4 @@ case class ProducerResponse(versionId: S
buffer.putInt(offsets.length)
offsets.foreach(buffer.putLong(_))
}
-}
-
-class ProducerResponseSend(val producerResponse: ProducerResponse,
- val error: Int = ErrorMapping.NoError) extends Send {
- private val header = ByteBuffer.allocate(6)
- header.putInt(producerResponse.sizeInBytes + 2)
- header.putShort(error.toShort)
- header.rewind()
-
- val responseContent = ProducerResponse.serializeResponse(producerResponse)
-
- var complete = false
-
- def writeTo(channel: GatheringByteChannel):Int = {
- expectIncomplete()
- var written = 0
- if(header.hasRemaining)
- written += channel.write(header)
-
- trace("Wrote %d bytes for header".format(written))
-
- if(!header.hasRemaining && responseContent.hasRemaining)
- written += channel.write(responseContent)
-
- trace("Wrote %d bytes for header, errors and offsets".format(written))
-
- if(!header.hasRemaining && !responseContent.hasRemaining)
- complete = true
-
- written
- }
-}
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala Wed Jun 6 01:28:46 2012
@@ -22,4 +22,6 @@ object RequestKeys {
val Fetch: Short = 1
val Offsets: Short = 2
val TopicMetadata: Short = 3
+ val LeaderAndISRRequest: Short = 4
+ val StopReplicaRequest: Short = 5
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala Wed Jun 6 01:28:46 2012
@@ -0,0 +1,28 @@
+package kafka.api
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio._
+
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
+
+ def sizeInBytes: Int
+
+ def writeTo(buffer: ByteBuffer): Unit
+
+}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Wed Jun 6 01:28:46 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+
+import java.nio._
+import kafka.utils._
+import collection.mutable.HashSet
+import collection.mutable.Set
+
+object StopReplicaRequest {
+ val CurrentVersion = 1.shortValue()
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
+ val versionId = buffer.getShort
+ val clientId = Utils.readShortString(buffer)
+ val ackTimeout = buffer.getInt
+ val topicPartitionPairCount = buffer.getInt
+ val topicPartitionPairSet = new HashSet[(String, Int)]()
+ for (i <- 0 until topicPartitionPairCount){
+ topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt))
+ }
+ new StopReplicaRequest(versionId, clientId, ackTimeout, topicPartitionPairSet)
+ }
+}
+
+case class StopReplicaRequest(versionId: Short,
+ clientId: String,
+ ackTimeout: Int,
+ stopReplicaSet: Set[(String, Int)]
+ ) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
+ def this(ackTimeout: Int, stopReplicaSet: Set[(String, Int)]) = {
+ this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeout, stopReplicaSet)
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ Utils.writeShortString(buffer, clientId)
+ buffer.putInt(ackTimeout)
+ buffer.putInt(stopReplicaSet.size)
+ for ((topic, partitionId) <- stopReplicaSet){
+ Utils.writeShortString(buffer, topic, "UTF-8")
+ buffer.putInt(partitionId)
+ }
+ }
+
+ def sizeInBytes(): Int = {
+ var size = 2 + (2 + clientId.length()) + 4 + 4
+ for ((topic, partitionId) <- stopReplicaSet){
+ size += (2 + topic.length()) + 4
+ }
+ size
+ }
+}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala Wed Jun 6 01:28:46 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.utils.Utils
+import collection.mutable.HashMap
+import collection.mutable.Map
+import kafka.common.ErrorMapping
+
+
+object StopReplicaResponse {
+ def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
+ val versionId = buffer.getShort
+ val errorCode = buffer.getShort
+ val numEntries = buffer.getInt
+
+ val responseMap = new HashMap[(String, Int), Short]()
+ for (i<- 0 until numEntries){
+ val topic = Utils.readShortString(buffer, "UTF-8")
+ val partition = buffer.getInt
+ val partitionErrorCode = buffer.getShort()
+ responseMap.put((topic, partition), partitionErrorCode)
+ }
+ new StopReplicaResponse(versionId, responseMap, errorCode)
+ }
+}
+
+
+case class StopReplicaResponse(val versionId: Short,
+ val responseMap: Map[(String, Int), Short],
+ val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+ def sizeInBytes: Int ={
+ var size = 2 + 2 + 4
+ for ((key, value) <- responseMap){
+ size += (2 + key._1.length) + 4 + 2
+ }
+ size
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ buffer.putShort(errorCode)
+ buffer.putInt(responseMap.size)
+ for ((key:(String, Int), value) <- responseMap){
+ Utils.writeShortString(buffer, key._1, "UTF-8")
+ buffer.putInt(key._2)
+ buffer.putShort(value)
+ }
+ }
+}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala Wed Jun 6 01:28:46 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+
+
+object TopicMetaDataResponse {
+
+ def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = {
+ val errorCode = buffer.getShort
+ val versionId = buffer.getShort
+
+ val topicCount = buffer.getInt
+ val topicsMetadata = new Array[TopicMetadata](topicCount)
+ for( i <- 0 until topicCount) {
+ topicsMetadata(i) = TopicMetadata.readFrom(buffer)
+ }
+ new TopicMetaDataResponse(versionId, topicsMetadata.toSeq, errorCode)
+ }
+}
+
+case class TopicMetaDataResponse(versionId: Short,
+ topicsMetadata: Seq[TopicMetadata],
+ errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse
+{
+ val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + 2
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ /* error code */
+ buffer.putShort(errorCode)
+ /* topic metadata */
+ buffer.putInt(topicsMetadata.length)
+ topicsMetadata.foreach(_.writeTo(buffer))
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Wed Jun 6 01:28:46 2012
@@ -19,16 +19,16 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Utils._
-import kafka.network.{Send, Request}
-import java.nio.channels.GatheringByteChannel
-import kafka.common.ErrorMapping
import collection.mutable.ListBuffer
+import kafka.utils._
sealed trait DetailedMetadataRequest { def requestId: Short }
case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] }
object TopicMetadataRequest {
+ val CurrentVersion = 1.shortValue()
+ val DefaultClientId = ""
/**
* TopicMetadataRequest has the following format -
@@ -48,6 +48,8 @@ object TopicMetadataRequest {
}
def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
+ val versionId = buffer.getShort
+ val clientId = Utils.readShortString(buffer)
val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
val topics = new ListBuffer[String]()
for(i <- 0 until numTopics)
@@ -66,37 +68,26 @@ object TopicMetadataRequest {
}
debug("topic = %s, detailed metadata request = %d"
.format(topicsList.head, returnDetailedMetadata.requestId))
- new TopicMetadataRequest(topics.toList, returnDetailedMetadata, timestamp, count)
- }
-
- def serializeTopicMetadata(topicMetadata: Seq[TopicMetadata]): ByteBuffer = {
- val size = topicMetadata.foldLeft(4 /* num topics */)(_ + _.sizeInBytes)
- val buffer = ByteBuffer.allocate(size)
- debug("Allocating buffer of size %d for topic metadata response".format(size))
- /* number of topics */
- buffer.putInt(topicMetadata.size)
- /* topic partition_metadata */
- topicMetadata.foreach(m => m.writeTo(buffer))
- buffer.rewind()
- buffer
- }
-
- def deserializeTopicsMetadataResponse(buffer: ByteBuffer): Seq[TopicMetadata] = {
- /* number of topics */
- val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
- val topicMetadata = new Array[TopicMetadata](numTopics)
- for(i <- 0 until numTopics)
- topicMetadata(i) = TopicMetadata.readFrom(buffer)
- topicMetadata
+ new TopicMetadataRequest(versionId, clientId, topics.toList, returnDetailedMetadata, timestamp, count)
}
}
-case class TopicMetadataRequest(val topics: Seq[String],
+case class TopicMetadataRequest(val versionId: Short,
+ val clientId: String,
+ val topics: Seq[String],
val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
val timestamp: Option[Long] = None, val count: Option[Int] = None)
- extends Request(RequestKeys.TopicMetadata){
+ extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){
+
+def this(topics: Seq[String]) =
+ this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)
+
+
+
def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ Utils.writeShortString(buffer, clientId)
buffer.putInt(topics.size)
topics.foreach(topic => writeShortString(buffer, topic))
buffer.putShort(detailedMetadata.requestId)
@@ -110,7 +101,7 @@ case class TopicMetadataRequest(val topi
}
def sizeInBytes(): Int = {
- var size: Int = 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
+ var size: Int = 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
2 /* detailed metadata */
detailedMetadata match {
case SegmentMetadata =>
@@ -121,34 +112,3 @@ case class TopicMetadataRequest(val topi
size
}
}
-
-class TopicMetadataSend(topicsMetadata: Seq[TopicMetadata]) extends Send {
- private var size: Int = topicsMetadata.foldLeft(4)(_ + _.sizeInBytes)
- private val header = ByteBuffer.allocate(6)
- header.putInt(size + 2)
- header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
- header.rewind()
-
- val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata)
- metadata.rewind()
-
- trace("Wrote size %d in header".format(size + 2))
- var complete: Boolean = false
-
- def writeTo(channel: GatheringByteChannel): Int = {
- expectIncomplete()
- var written = 0
- if(header.hasRemaining)
- written += channel.write(header)
- trace("Wrote %d bytes for header".format(written))
-
- if(!header.hasRemaining && metadata.hasRemaining)
- written += channel.write(metadata)
-
- trace("Wrote %d bytes for header and metadata".format(written))
-
- if(!metadata.hasRemaining)
- complete = true
- written
- }
-}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Wed Jun 6 01:28:46 2012
@@ -28,36 +28,36 @@ import scala.Predef._
object ErrorMapping {
val EmptyByteBuffer = ByteBuffer.allocate(0)
- val UnknownCode = -1
- val NoError = 0
- val OffsetOutOfRangeCode = 1
- val InvalidMessageCode = 2
- val InvalidPartitionCode = 3
- val InvalidFetchSizeCode = 4
- val InvalidFetchRequestFormatCode = 5
- val NotLeaderForPartitionCode = 6
- val NoLeaderForPartitionCode = 7
- val UnknownTopicCode = 8
+ val UnknownCode : Short = -1
+ val NoError : Short = 0
+ val OffsetOutOfRangeCode : Short = 1
+ val InvalidMessageCode : Short = 2
+ val InvalidPartitionCode : Short = 3
+ val InvalidFetchSizeCode : Short = 4
+ val InvalidFetchRequestFormatCode : Short = 5
+ val NoLeaderForPartitionCode : Short = 6
+ val NotLeaderForPartitionCode : Short = 7
+ val UnknownTopicCode : Short = 8
private val exceptionToCode =
- Map[Class[Throwable], Int](
+ Map[Class[Throwable], Short](
classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> InvalidPartitionCode,
classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
- classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode
-// classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
+ classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
+ classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
).withDefaultValue(UnknownCode)
/* invert the mapping */
private val codeToException =
- (Map[Int, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
+ (Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
- def codeFor(exception: Class[Throwable]): Int = exceptionToCode(exception)
+ def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception)
- def maybeThrowException(code: Int) =
+ def maybeThrowException(code: Short) =
if(code != 0)
throw codeToException(code).newInstance()
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Wed Jun 6 01:28:46 2012
@@ -57,10 +57,10 @@ class SimpleConsumer( val host: String,
}
}
- private def sendRequest(request: Request): Tuple2[Receive, Int] = {
+ private def sendRequest(request: RequestOrResponse): Receive = {
lock synchronized {
getOrMakeConnection()
- var response: Tuple2[Receive,Int] = null
+ var response: Receive = null
try {
blockingChannel.send(request)
response = blockingChannel.receive()
@@ -92,7 +92,7 @@ class SimpleConsumer( val host: String,
def fetch(request: FetchRequest): FetchResponse = {
val startTime = SystemTime.nanoseconds
val response = sendRequest(request)
- val fetchResponse = FetchResponse.readFrom(response._1.buffer)
+ val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes
val endTime = SystemTime.nanoseconds
@@ -112,7 +112,7 @@ class SimpleConsumer( val host: String,
def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
val response = sendRequest(request)
- OffsetRequest.deserializeOffsetArray(response._1.buffer)
+ OffsetResponse.readFrom(response.buffer).offsets
}
private def getOrMakeConnection() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Wed Jun 6 01:28:46 2012
@@ -16,7 +16,7 @@
*/
package kafka.javaapi
-import kafka.network.Request
+import kafka.api.RequestOrResponse
import kafka.api.{RequestKeys, TopicData}
import java.nio.ByteBuffer
@@ -24,7 +24,7 @@ class ProducerRequest(val correlationId:
val clientId: String,
val requiredAcks: Short,
val ackTimeout: Int,
- val data: Array[TopicData]) extends Request(RequestKeys.Produce) {
+ val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) {
val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Wed Jun 6 01:28:46 2012
@@ -22,7 +22,7 @@ import kafka.message._
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
- private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
+ private val errorCode: Short = ErrorMapping.NoError) extends MessageSet {
val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
initialOffset,
errorCode)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Wed Jun 6 01:28:46 2012
@@ -35,7 +35,7 @@ import kafka.common.{MessageSizeTooLarge
*/
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
- private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
+ private val errorCode: Short = ErrorMapping.NoError) extends MessageSet with Logging {
private var shallowValidByteCount = -1L
if(sizeInBytes > Int.MaxValue)
throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Wed Jun 6 01:28:46 2012
@@ -20,6 +20,7 @@ package kafka.network
import java.net.InetSocketAddress
import java.nio.channels._
import kafka.utils.{nonthreadsafe, Logging}
+import kafka.api.RequestOrResponse
/**
* A simple blocking channel with timeouts correctly enabled.
@@ -70,7 +71,7 @@ class BlockingChannel( val host: String,
def isConnected = connected
- def send(request: Request):Int = {
+ def send(request: RequestOrResponse):Int = {
if(!connected)
throw new ClosedChannelException()
@@ -78,16 +79,14 @@ class BlockingChannel( val host: String,
send.writeCompletely(writeChannel)
}
- def receive(): Tuple2[Receive, Int] = {
+ def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()
val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)
- // this has the side effect of setting the initial position of buffer correctly
- val errorCode: Int = response.buffer.getShort
- (response, errorCode)
+ response
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala Wed Jun 6 01:28:46 2012
@@ -20,6 +20,7 @@ package kafka.network
import java.nio._
import java.nio.channels._
import kafka.utils._
+import kafka.api.RequestOrResponse
@nonthreadsafe
private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send {
@@ -37,12 +38,18 @@ private[kafka] class BoundedByteBufferSe
def this(size: Int) = this(ByteBuffer.allocate(size))
- def this(request: Request) = {
- this(request.sizeInBytes + 2)
- buffer.putShort(request.id)
+ def this(request: RequestOrResponse) = {
+ this(request.sizeInBytes + (if(request.requestId != None) 2 else 0))
+ request.requestId match {
+ case Some(requestId) =>
+ buffer.putShort(requestId)
+ case None =>
+ }
+
request.writeTo(buffer)
buffer.rewind()
}
+
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Wed Jun 6 01:28:46 2012
@@ -19,7 +19,7 @@ package kafka.producer
import kafka.api._
import kafka.message.MessageSet
-import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive}
+import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
import kafka.utils._
import java.util.Random
import kafka.common.MessageSizeTooLargeException
@@ -46,7 +46,7 @@ class SyncProducer(val config: SyncProdu
trace("Instantiating Scala Sync Producer")
- private def verifyRequest(request: Request) = {
+ private def verifyRequest(request: RequestOrResponse) = {
/**
* This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
* Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
@@ -66,13 +66,13 @@ class SyncProducer(val config: SyncProdu
/**
* Common functionality for the public send methods
*/
- private def doSend(request: Request): Tuple2[Receive, Int] = {
+ private def doSend(request: RequestOrResponse): Receive = {
lock synchronized {
verifyRequest(request)
val startTime = SystemTime.nanoseconds
getOrMakeConnection()
- var response: Tuple2[Receive, Int] = null
+ var response: Receive = null
try {
blockingChannel.send(request)
response = blockingChannel.receive()
@@ -108,12 +108,13 @@ class SyncProducer(val config: SyncProdu
}
}
val response = doSend(producerRequest)
- ProducerResponse.deserializeResponse(response._1.buffer)
+ ProducerResponse.readFrom(response.buffer)
}
def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
val response = doSend(request)
- TopicMetadataRequest.deserializeTopicsMetadataResponse(response._1.buffer)
+ val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer)
+ topicMetaDataResponse.topicsMetadata
}
def close() = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Jun 6 01:28:46 2012
@@ -28,8 +28,11 @@ import kafka.network._
import kafka.utils.{SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection._
+import mutable.HashMap
import scala.math._
import java.lang.IllegalStateException
+import kafka.network.RequestChannel.Response
+
/**
* Logic to handle the various Kafka requests
@@ -50,10 +53,40 @@ class KafkaApis(val requestChannel: Requ
case RequestKeys.Fetch => handleFetchRequest(request)
case RequestKeys.Offsets => handleOffsetRequest(request)
case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
+ case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request)
+ case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request)
case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
}
}
+
+ def handleLeaderAndISRRequest(request: RequestChannel.Request){
+ val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
+ val responseMap = new HashMap[(String, Int), Short]
+
+ // TODO: put in actual logic later
+ for((key, value) <- leaderAndISRRequest.leaderAndISRInfos){
+ responseMap.put(key, ErrorMapping.NoError)
+ }
+
+ val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse), -1))
+ }
+
+
+ def handleStopReplicaRequest(request: RequestChannel.Request){
+ val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
+ val responseMap = new HashMap[(String, Int), Short]
+
+ // TODO: put in actual logic later
+ for((topic, partition) <- stopReplicaRequest.stopReplicaSet){
+ responseMap.put((topic, partition), ErrorMapping.NoError)
+ }
+ val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse), -1))
+ }
+
+
/**
* Handle a produce request
*/
@@ -65,7 +98,7 @@ class KafkaApis(val requestChannel: Requ
val response = produce(produceRequest)
debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
- requestChannel.sendResponse(new RequestChannel.Response(request, new ProducerResponseSend(response), -1))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
// Now check any outstanding fetches this produce just unblocked
var satisfied = new mutable.ArrayBuffer[DelayedFetch]
@@ -77,7 +110,7 @@ class KafkaApis(val requestChannel: Requ
for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
- requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+ requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response), -1))
}
}
@@ -115,7 +148,7 @@ class KafkaApis(val requestChannel: Requ
}
}
}
- new ProducerResponse(ProducerResponse.CurrentVersion, request.correlationId, errors, offsets)
+ new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
}
/**
@@ -131,9 +164,8 @@ class KafkaApis(val requestChannel: Requ
fetchRequest.validate()
} catch {
case e:FetchRequestFormatException =>
- val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty)
- val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response,
- ErrorMapping.InvalidFetchRequestFormatCode), -1)
+ val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty)
+ val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response), -1)
requestChannel.sendResponse(channelResponse)
}
@@ -147,7 +179,7 @@ class KafkaApis(val requestChannel: Requ
debug("Returning fetch response %s for fetch request with correlation id %d"
.format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
- requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response), -1))
} else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
val keys: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
@@ -240,8 +272,8 @@ class KafkaApis(val requestChannel: Requ
/**
* Read from a single topic/partition at the given offset
*/
- private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
- var response: Either[Int, MessageSet] = null
+ private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = {
+ var response: Either[Short, MessageSet] = null
try {
// check if the current broker is the leader for the partitions
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
@@ -264,8 +296,8 @@ class KafkaApis(val requestChannel: Requ
if(requestLogger.isTraceEnabled)
requestLogger.trace("Offset request " + offsetRequest.toString)
val offsets = logManager.getOffsets(offsetRequest)
- val response = new OffsetArraySend(offsets)
- requestChannel.sendResponse(new RequestChannel.Response(request, response, -1))
+ val response = new OffsetResponse(offsetRequest.versionId, offsets)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
}
/**
@@ -303,7 +335,8 @@ class KafkaApis(val requestChannel: Requ
}
}
info("Sending response for topic metadata request")
- requestChannel.sendResponse(new RequestChannel.Response(request, new TopicMetadataSend(topicsMetadata), -1))
+ val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
}
def close() {
@@ -337,7 +370,7 @@ class KafkaApis(val requestChannel: Requ
def expire(delayed: DelayedFetch) {
val topicData = readMessageSets(delayed.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
- requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+ requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response), -1))
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala Wed Jun 6 01:28:46 2012
@@ -29,13 +29,13 @@ import kafka.common.ErrorMapping
* wholly in kernel space
*/
@nonthreadsafe
-private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Int) extends Send {
+private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send {
private var sent: Long = 0
private var size: Long = messages.sizeInBytes
private val header = ByteBuffer.allocate(6)
header.putInt(size.asInstanceOf[Int] + 2)
- header.putShort(errorCode.asInstanceOf[Short])
+ header.putShort(errorCode)
header.rewind()
var complete: Boolean = false
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala?rev=1346697&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala Wed Jun 6 01:28:46 2012
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.controller
+
+
+import org.scalatest.junit.JUnit3Suite
+
+import junit.framework.Assert._
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+import kafka.api._
+import collection.mutable.Map
+import collection.mutable.Set
+import kafka.integration.KafkaServerTestHarness
+import kafka.utils.TestUtils
+import kafka.server.KafkaConfig
+import kafka.network.{Receive, BlockingChannel}
+
+
+class ControllerToBrokerRequestTest extends JUnit3Suite with KafkaServerTestHarness {
+
+ val kafkaProps = TestUtils.createBrokerConfigs(1)
+ val configs = List(new KafkaConfig(kafkaProps.head))
+ var blockingChannel: BlockingChannel = null
+
+ override def setUp() {
+ super.setUp()
+ blockingChannel = new BlockingChannel("localhost", configs.head.port, 1000000, 0, 64*1024)
+ blockingChannel.connect
+ }
+
+ override def tearDown() {
+ super.tearDown()
+ blockingChannel.disconnect()
+ }
+
+
+ def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
+ val topic1 = "test1"
+ val topic2 = "test2"
+
+ val leader1 = 1;
+ val ISR1 = List(1, 2, 3)
+
+ val leader2 = 2;
+ val ISR2 = List(2, 3, 4)
+
+ val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
+ val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
+ val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
+ ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
+
+ new LeaderAndISRRequest(1, "client 1", 1, 4, map)
+ }
+
+ def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
+ val topic1 = "test1"
+ val topic2 = "test2"
+ val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+ ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+
+ new LeaderAndISRResponse(1, responseMap)
+ }
+
+
+ def createSampleStopReplicaRequest() : StopReplicaRequest = {
+ val topic1 = "test1"
+ val topic2 = "test2"
+ new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
+ (topic2, 1), (topic2, 2)))
+ }
+
+ def createSampleStopReplicaResponse() : StopReplicaResponse = {
+ val topic1 = "test1"
+ val topic2 = "test2"
+ val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+ ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+
+ new StopReplicaResponse(1, responseMap)
+ }
+
+
+ def testLeaderAndISRRequest {
+ val leaderAndISRRequest = createSampleLeaderAndISRRequest()
+
+ val serializedLeaderAndISRRequest = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes)
+ leaderAndISRRequest.writeTo(serializedLeaderAndISRRequest)
+ serializedLeaderAndISRRequest.rewind()
+ val deserializedLeaderAndISRRequest = LeaderAndISRRequest.readFrom(serializedLeaderAndISRRequest)
+
+ assertEquals(leaderAndISRRequest, deserializedLeaderAndISRRequest)
+ }
+
+ def testLeaderAndISRResponse {
+ val leaderAndISRResponse = createSampleLeaderAndISRResponse()
+
+ val serializedLeaderAndISRResponse = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes)
+ leaderAndISRResponse.writeTo(serializedLeaderAndISRResponse)
+ serializedLeaderAndISRResponse.rewind()
+ val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(serializedLeaderAndISRResponse)
+ assertEquals(leaderAndISRResponse, deserializedLeaderAndISRResponse)
+ }
+
+
+ def testStopReplicaRequest {
+ val stopReplicaRequest = createSampleStopReplicaRequest()
+
+ val serializedStopReplicaRequest = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes)
+ stopReplicaRequest.writeTo(serializedStopReplicaRequest)
+ serializedStopReplicaRequest.rewind()
+ val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(serializedStopReplicaRequest)
+ assertEquals(stopReplicaRequest, deserializedStopReplicaRequest)
+ }
+
+
+ def testStopReplicaResponse {
+ val stopReplicaResponse = createSampleStopReplicaResponse()
+
+ val serializedStopReplicaResponse = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes)
+ stopReplicaResponse.writeTo(serializedStopReplicaResponse)
+ serializedStopReplicaResponse.rewind()
+ val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(serializedStopReplicaResponse)
+ assertEquals(stopReplicaResponse, deserializedStopReplicaResponse)
+ }
+
+
+
+ def testEndToEndLeaderAndISRRequest {
+
+ val leaderAndISRRequest = createSampleLeaderAndISRRequest()
+
+ var response: Receive = null
+ blockingChannel.send(leaderAndISRRequest)
+ response = blockingChannel.receive()
+
+ val leaderAndISRResponse = LeaderAndISRResponse.readFrom(response.buffer)
+ val expectedLeaderAndISRResponse = createSampleLeaderAndISRResponse()
+
+ assertEquals(leaderAndISRResponse, expectedLeaderAndISRResponse)
+
+ }
+
+
+
+ def testEndToEndStopReplicaRequest {
+ val stopReplicaRequest = createSampleStopReplicaRequest()
+
+ var response: Receive = null
+ blockingChannel.send(stopReplicaRequest)
+ response = blockingChannel.receive()
+
+ val stopReplicaResponse = StopReplicaResponse.readFrom(response.buffer)
+ val expectedStopReplicaResponse = createSampleStopReplicaResponse()
+ assertEquals(stopReplicaResponse, expectedStopReplicaResponse)
+
+ }
+
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Wed Jun 6 01:28:46 2012
@@ -25,12 +25,13 @@ import kafka.log.LogManager
import junit.framework.Assert._
import org.easymock.EasyMock
import kafka.network._
-import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
+import kafka.api.{TopicMetaDataResponse, TopicMetadataRequest}
import kafka.cluster.Broker
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
+
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
@@ -104,10 +105,10 @@ class TopicMetadataTest extends JUnit3Su
// call the API (to be tested) to get metadata
apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
- val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[TopicMetadataSend].metadata
+ val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer
// check assertions
- val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(metadataResponse)
+ val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
val partitionMetadata = topicMetadata.head.partitionsMetadata
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Wed Jun 6 01:28:46 2012
@@ -121,7 +121,7 @@ class ByteBufferMessageSetTest extends B
buffer.put(emptyMessageSet.getSerialized())
buffer.put(regularMessgeSet.getSerialized())
buffer.rewind
- val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
+ val mixedMessageSet = new ByteBufferMessageSet(buffer)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
@@ -142,7 +142,7 @@ class ByteBufferMessageSetTest extends B
buffer.put(emptyMessageSet.getSerialized())
buffer.put(regularMessgeSet.getSerialized())
buffer.rewind
- val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
+ val mixedMessageSet = new ByteBufferMessageSet(buffer)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Wed Jun 6 01:28:46 2012
@@ -382,9 +382,9 @@ class AsyncProducerTest extends JUnit3Su
mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
EasyMock.expectLastCall().andReturn(List(topic1Metadata))
mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5))))
- EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)))
+ EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5))))
- EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)))
+ EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
EasyMock.replay(mockSyncProducer)
val producerPool = EasyMock.createMock(classOf[ProducerPool])
@@ -442,9 +442,9 @@ class AsyncProducerTest extends JUnit3Su
// On the third try for partition 0, let it succeed.
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
val response1 =
- new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
+ new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
- val response2 = new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L))
+ val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1346697&r1=1346696&r2=1346697&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Wed Jun 6 01:28:46 2012
@@ -31,7 +31,6 @@ import org.I0Itec.zkclient.ZkClient
import kafka.cluster.Broker
import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
-import scala.collection.Map
import kafka.api.{TopicData, PartitionData}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit