You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/07/22 19:53:41 UTC
git commit: kafka-1462 (followup patch);
Add new request and response formats for the new consumer and
coordinator communication; patched by Jun Rao; reviewed by Jay Kreps
Repository: kafka
Updated Branches:
refs/heads/trunk 1e4b0841b -> 014b700f0
kafka-1462 (followup patch); Add new request and response formats for the new consumer and coordinator communication; patched by Jun Rao; reviewed by Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/014b700f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/014b700f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/014b700f
Branch: refs/heads/trunk
Commit: 014b700f0323b4dc00d3aa0e1b598f7e2ed07957
Parents: 1e4b084
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jul 22 10:53:34 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jul 22 10:53:34 2014 -0700
----------------------------------------------------------------------
.../kafka/api/GenericRequestAndHeader.scala | 55 ++++++++++++++++++++
.../api/GenericRequestOrResponseAndHeader.scala | 45 ----------------
.../kafka/api/GenericResponseAndHeader.scala | 46 ++++++++++++++++
.../kafka/api/HeartbeatRequestAndHeader.scala | 20 ++++---
.../kafka/api/HeartbeatResponseAndHeader.scala | 10 ++--
.../kafka/api/JoinGroupRequestAndHeader.scala | 17 +++---
.../kafka/api/JoinGroupResponseAndHeader.scala | 10 ++--
.../api/RequestResponseSerializationTest.scala | 12 ++---
8 files changed, 139 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
new file mode 100644
index 0000000..f40e19f
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import org.apache.kafka.common.requests.AbstractRequestResponse
+import kafka.api.ApiUtils._
+
+private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
+ val correlationId: Int,
+ val clientId: String,
+ val body: AbstractRequestResponse,
+ val name: String,
+ override val requestId: Option[Short] = None)
+ extends RequestOrResponse(requestId) {
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ writeShortString(buffer, clientId)
+ body.writeTo(buffer)
+ }
+
+ def sizeInBytes(): Int = {
+ 2 /* version id */ +
+ 4 /* correlation id */ +
+ (2 + clientId.length) /* client id */ +
+ body.sizeOf();
+ }
+
+ override def toString(): String = {
+ describe(true)
+ }
+
+ override def describe(details: Boolean): String = {
+ val strBuffer = new StringBuilder
+ strBuffer.append("Name: " + name)
+ strBuffer.append("; Version: " + versionId)
+ strBuffer.append("; CorrelationId: " + correlationId)
+ strBuffer.append("; ClientId: " + clientId)
+ strBuffer.append("; Body: " + body.toString)
+ strBuffer.toString()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
deleted file mode 100644
index fb022e8..0000000
--- a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import org.apache.kafka.common.requests.AbstractRequestResponse
-
-private[kafka] abstract class GenericRequestOrResponseAndHeader(val header: AbstractRequestResponse,
- val body: AbstractRequestResponse,
- val name: String,
- override val requestId: Option[Short] = None)
- extends RequestOrResponse(requestId) {
-
- def writeTo(buffer: ByteBuffer) {
- header.writeTo(buffer)
- body.writeTo(buffer)
- }
-
- def sizeInBytes(): Int = {
- header.sizeOf() + body.sizeOf();
- }
-
- override def toString(): String = {
- describe(true)
- }
-
- override def describe(details: Boolean): String = {
- val strBuffer = new StringBuilder
- strBuffer.append("Name: " + name)
- strBuffer.append("; header: " + header.toString)
- strBuffer.append("; body: " + body.toString)
- strBuffer.toString()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
new file mode 100644
index 0000000..a4879e2
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import org.apache.kafka.common.requests.AbstractRequestResponse
+
+private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
+ val body: AbstractRequestResponse,
+ val name: String,
+ override val requestId: Option[Short] = None)
+ extends RequestOrResponse(requestId) {
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(correlationId)
+ body.writeTo(buffer)
+ }
+
+ def sizeInBytes(): Int = {
+ 4 /* correlation id */ +
+ body.sizeOf();
+ }
+
+ override def toString(): String = {
+ describe(true)
+ }
+
+ override def describe(details: Boolean): String = {
+ val strBuffer = new StringBuilder
+ strBuffer.append("Name: " + name)
+ strBuffer.append("; CorrelationId: " + correlationId)
+ strBuffer.append("; Body: " + body.toString)
+ strBuffer.toString()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
index 932418b..f168d9f 100644
--- a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
@@ -16,24 +16,30 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.common.ErrorMapping
+import org.apache.kafka.common.requests.{HeartbeatResponse, HeartbeatRequest}
+import kafka.api.ApiUtils._
import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.requests.{HeartbeatResponse, ResponseHeader, HeartbeatRequest, RequestHeader}
+import scala.Some
object HeartbeatRequestAndHeader {
def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = {
- val header = RequestHeader.parse(buffer)
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = readShortString(buffer)
val body = HeartbeatRequest.parse(buffer)
- new HeartbeatRequestAndHeader(header, body)
+ new HeartbeatRequestAndHeader(versionId, correlationId, clientId, body)
}
}
-case class HeartbeatRequestAndHeader(override val header: RequestHeader, override val body: HeartbeatRequest)
- extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) {
+case class HeartbeatRequestAndHeader(override val versionId: Short,
+ override val correlationId: Int,
+ override val clientId: String,
+ override val body: HeartbeatRequest)
+ extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) {
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val errorResponseHeader = new ResponseHeader(header.correlationId)
val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(errorResponseHeader, errorResponseBody)
+ val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(correlationId, errorResponseBody)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
index 556f38d..9a71faa 100644
--- a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
@@ -12,17 +12,17 @@
*/
package kafka.api
-import org.apache.kafka.common.requests.{ResponseHeader, HeartbeatResponse}
+import org.apache.kafka.common.requests.HeartbeatResponse
import java.nio.ByteBuffer
object HeartbeatResponseAndHeader {
def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = {
- val header = ResponseHeader.parse(buffer)
+ val correlationId = buffer.getInt
val body = HeartbeatResponse.parse(buffer)
- new HeartbeatResponseAndHeader(header, body)
+ new HeartbeatResponseAndHeader(correlationId, body)
}
}
-case class HeartbeatResponseAndHeader(override val header: ResponseHeader, override val body: HeartbeatResponse)
- extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) {
+case class HeartbeatResponseAndHeader(override val correlationId: Int, override val body: HeartbeatResponse)
+ extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
index 9aea28c..3651e86 100644
--- a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
@@ -17,24 +17,29 @@ import java.nio.ByteBuffer
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.common.ErrorMapping
import org.apache.kafka.common.requests._
+import kafka.api.ApiUtils._
import kafka.network.RequestChannel.Response
import scala.Some
object JoinGroupRequestAndHeader {
def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = {
- val header = RequestHeader.parse(buffer)
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = readShortString(buffer)
val body = JoinGroupRequest.parse(buffer)
- new JoinGroupRequestAndHeader(header, body)
+ new JoinGroupRequestAndHeader(versionId, correlationId, clientId, body)
}
}
-case class JoinGroupRequestAndHeader(override val header: RequestHeader, override val body: JoinGroupRequest)
- extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) {
+case class JoinGroupRequestAndHeader(override val versionId: Short,
+ override val correlationId: Int,
+ override val clientId: String,
+ override val body: JoinGroupRequest)
+ extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) {
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val errorResponseHeader = new ResponseHeader(header.correlationId)
val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(errorResponseHeader, errorResponseBody)
+ val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(correlationId, errorResponseBody)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
index 7389ae6..d0f07e0 100644
--- a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
@@ -12,17 +12,17 @@
*/
package kafka.api
-import org.apache.kafka.common.requests.{JoinGroupResponse, ResponseHeader}
+import org.apache.kafka.common.requests.JoinGroupResponse
import java.nio.ByteBuffer
object JoinGroupResponseAndHeader {
def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = {
- val header = ResponseHeader.parse(buffer)
+ val correlationId = buffer.getInt
val body = JoinGroupResponse.parse(buffer)
- new JoinGroupResponseAndHeader(header, body)
+ new JoinGroupResponseAndHeader(correlationId, body)
}
}
-case class JoinGroupResponseAndHeader(override val header: ResponseHeader, override val body: JoinGroupResponse)
- extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) {
+case class JoinGroupResponseAndHeader(override val correlationId: Int, override val body: JoinGroupResponse)
+ extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 847a36b..cd16ced 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -196,29 +196,25 @@ object SerializationTestUtils {
}
def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = {
- val header = new RequestHeader(ApiKeys.HEARTBEAT.id, 0.asInstanceOf[Short], "", 1)
val body = new HeartbeatRequest("group1", 1, "consumer1")
- HeartbeatRequestAndHeader(header, body)
+ HeartbeatRequestAndHeader(0.asInstanceOf[Short], 1, "", body)
}
def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = {
- val header = new ResponseHeader(1)
val body = new HeartbeatResponse(0.asInstanceOf[Short])
- HeartbeatResponseAndHeader(header, body)
+ HeartbeatResponseAndHeader(1, body)
}
def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = {
import scala.collection.JavaConversions._
- val header = new RequestHeader(ApiKeys.JOIN_GROUP.id, 0.asInstanceOf[Short], "", 1)
val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1");
- JoinGroupRequestAndHeader(header, body)
+ JoinGroupRequestAndHeader(0.asInstanceOf[Short], 1, "", body)
}
def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = {
import scala.collection.JavaConversions._
- val header = new ResponseHeader(1)
val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1)))
- JoinGroupResponseAndHeader(header, body)
+ JoinGroupResponseAndHeader(1, body)
}
}