You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:20 UTC

[23/37] git commit: kafka-1462 (followup patch); Add new request and response formats for the new consumer and coordinator communication; patched by Jun Rao; reviewed by Jay Kreps

kafka-1462 (followup patch); Add new request and response formats for the new consumer and coordinator communication; patched by Jun Rao; reviewed by Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/014b700f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/014b700f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/014b700f

Branch: refs/heads/transactional_messaging
Commit: 014b700f0323b4dc00d3aa0e1b598f7e2ed07957
Parents: 1e4b084
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jul 22 10:53:34 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jul 22 10:53:34 2014 -0700

----------------------------------------------------------------------
 .../kafka/api/GenericRequestAndHeader.scala     | 55 ++++++++++++++++++++
 .../api/GenericRequestOrResponseAndHeader.scala | 45 ----------------
 .../kafka/api/GenericResponseAndHeader.scala    | 46 ++++++++++++++++
 .../kafka/api/HeartbeatRequestAndHeader.scala   | 20 ++++---
 .../kafka/api/HeartbeatResponseAndHeader.scala  | 10 ++--
 .../kafka/api/JoinGroupRequestAndHeader.scala   | 17 +++---
 .../kafka/api/JoinGroupResponseAndHeader.scala  | 10 ++--
 .../api/RequestResponseSerializationTest.scala  | 12 ++---
 8 files changed, 139 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
new file mode 100644
index 0000000..f40e19f
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import org.apache.kafka.common.requests.AbstractRequestResponse
+import kafka.api.ApiUtils._
+
+private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
+                                                      val correlationId: Int,
+                                                      val clientId: String,
+                                                      val body: AbstractRequestResponse,
+                                                      val name: String,
+                                                      override val requestId: Option[Short] = None)
+  extends RequestOrResponse(requestId) {
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+    body.writeTo(buffer)
+  }
+
+  def sizeInBytes(): Int = {
+    2 /* version id */ +
+    4 /* correlation id */ +
+    (2 + clientId.length) /* client id */ +
+    body.sizeOf();
+  }
+
+  override def toString(): String = {
+    describe(true)
+  }
+
+  override def describe(details: Boolean): String = {
+    val strBuffer = new StringBuilder
+    strBuffer.append("Name: " + name)
+    strBuffer.append("; Version: " + versionId)
+    strBuffer.append("; CorrelationId: " + correlationId)
+    strBuffer.append("; ClientId: " + clientId)
+    strBuffer.append("; Body: " + body.toString)
+    strBuffer.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
deleted file mode 100644
index fb022e8..0000000
--- a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import org.apache.kafka.common.requests.AbstractRequestResponse
-
-private[kafka] abstract class GenericRequestOrResponseAndHeader(val header: AbstractRequestResponse,
-                                                                val body: AbstractRequestResponse,
-                                                                val name: String,
-                                                                override val requestId: Option[Short] = None)
-  extends RequestOrResponse(requestId) {
-
-  def writeTo(buffer: ByteBuffer) {
-    header.writeTo(buffer)
-    body.writeTo(buffer)
-  }
-
-  def sizeInBytes(): Int = {
-    header.sizeOf() + body.sizeOf();
-  }
-
-  override def toString(): String = {
-    describe(true)
-  }
-
-  override def describe(details: Boolean): String = {
-    val strBuffer = new StringBuilder
-    strBuffer.append("Name: " + name)
-    strBuffer.append("; header: " + header.toString)
-    strBuffer.append("; body: " + body.toString)
-    strBuffer.toString()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
new file mode 100644
index 0000000..a4879e2
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import org.apache.kafka.common.requests.AbstractRequestResponse
+
+private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
+                                                       val body: AbstractRequestResponse,
+                                                       val name: String,
+                                                       override val requestId: Option[Short] = None)
+  extends RequestOrResponse(requestId) {
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    body.writeTo(buffer)
+  }
+
+  def sizeInBytes(): Int = {
+    4 /* correlation id */ +
+    body.sizeOf();
+  }
+
+  override def toString(): String = {
+    describe(true)
+  }
+
+  override def describe(details: Boolean): String = {
+    val strBuffer = new StringBuilder
+    strBuffer.append("Name: " + name)
+    strBuffer.append("; CorrelationId: " + correlationId)
+    strBuffer.append("; Body: " + body.toString)
+    strBuffer.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
index 932418b..f168d9f 100644
--- a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
@@ -16,24 +16,30 @@ package kafka.api
 import java.nio.ByteBuffer
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.common.ErrorMapping
+import org.apache.kafka.common.requests.{HeartbeatResponse, HeartbeatRequest}
+import kafka.api.ApiUtils._
 import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.requests.{HeartbeatResponse, ResponseHeader, HeartbeatRequest, RequestHeader}
+import scala.Some
 
 object HeartbeatRequestAndHeader {
   def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = {
-    val header = RequestHeader.parse(buffer)
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
     val body = HeartbeatRequest.parse(buffer)
-    new HeartbeatRequestAndHeader(header, body)
+    new HeartbeatRequestAndHeader(versionId, correlationId, clientId, body)
   }
 }
 
-case class HeartbeatRequestAndHeader(override val header: RequestHeader, override  val body: HeartbeatRequest)
-  extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) {
+case class HeartbeatRequestAndHeader(override val versionId: Short,
+                                     override val correlationId: Int,
+                                     override val clientId: String,
+                                     override val body: HeartbeatRequest)
+  extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) {
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponseHeader = new ResponseHeader(header.correlationId)
     val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(errorResponseHeader, errorResponseBody)
+    val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(correlationId, errorResponseBody)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
index 556f38d..9a71faa 100644
--- a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
@@ -12,17 +12,17 @@
  */
 package kafka.api
 
-import org.apache.kafka.common.requests.{ResponseHeader, HeartbeatResponse}
+import org.apache.kafka.common.requests.HeartbeatResponse
 import java.nio.ByteBuffer
 
 object HeartbeatResponseAndHeader {
   def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = {
-    val header = ResponseHeader.parse(buffer)
+    val correlationId = buffer.getInt
     val body = HeartbeatResponse.parse(buffer)
-    new HeartbeatResponseAndHeader(header, body)
+    new HeartbeatResponseAndHeader(correlationId, body)
   }
 }
 
-case class HeartbeatResponseAndHeader(override val header: ResponseHeader, override val body: HeartbeatResponse)
-  extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) {
+case class HeartbeatResponseAndHeader(override val correlationId: Int, override val body: HeartbeatResponse)
+  extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) {
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
index 9aea28c..3651e86 100644
--- a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
@@ -17,24 +17,29 @@ import java.nio.ByteBuffer
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.common.ErrorMapping
 import org.apache.kafka.common.requests._
+import kafka.api.ApiUtils._
 import kafka.network.RequestChannel.Response
 import scala.Some
 
 object JoinGroupRequestAndHeader {
   def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = {
-    val header = RequestHeader.parse(buffer)
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
     val body = JoinGroupRequest.parse(buffer)
-    new JoinGroupRequestAndHeader(header, body)
+    new JoinGroupRequestAndHeader(versionId, correlationId, clientId, body)
   }
 }
 
-case class JoinGroupRequestAndHeader(override val header: RequestHeader, override val body: JoinGroupRequest)
-  extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) {
+case class JoinGroupRequestAndHeader(override val versionId: Short,
+                                     override val correlationId: Int,
+                                     override val clientId: String,
+                                     override val body: JoinGroupRequest)
+  extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) {
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponseHeader = new ResponseHeader(header.correlationId)
     val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(errorResponseHeader, errorResponseBody)
+    val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(correlationId, errorResponseBody)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
index 7389ae6..d0f07e0 100644
--- a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
@@ -12,17 +12,17 @@
  */
 package kafka.api
 
-import org.apache.kafka.common.requests.{JoinGroupResponse, ResponseHeader}
+import org.apache.kafka.common.requests.JoinGroupResponse
 import java.nio.ByteBuffer
 
 object JoinGroupResponseAndHeader {
   def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = {
-    val header = ResponseHeader.parse(buffer)
+    val correlationId = buffer.getInt
     val body = JoinGroupResponse.parse(buffer)
-    new JoinGroupResponseAndHeader(header, body)
+    new JoinGroupResponseAndHeader(correlationId, body)
   }
 }
 
-case class JoinGroupResponseAndHeader(override val header: ResponseHeader, override val body: JoinGroupResponse)
-  extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) {
+case class JoinGroupResponseAndHeader(override val correlationId: Int, override val body: JoinGroupResponse)
+  extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) {
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/014b700f/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 847a36b..cd16ced 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -196,29 +196,25 @@ object SerializationTestUtils {
   }
 
   def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = {
-    val header = new RequestHeader(ApiKeys.HEARTBEAT.id, 0.asInstanceOf[Short], "", 1)
     val body = new HeartbeatRequest("group1", 1, "consumer1")
-    HeartbeatRequestAndHeader(header, body)
+    HeartbeatRequestAndHeader(0.asInstanceOf[Short], 1, "", body)
   }
 
   def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = {
-    val header = new ResponseHeader(1)
     val body = new HeartbeatResponse(0.asInstanceOf[Short])
-    HeartbeatResponseAndHeader(header, body)
+    HeartbeatResponseAndHeader(1, body)
   }
 
   def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = {
     import scala.collection.JavaConversions._
-    val header = new RequestHeader(ApiKeys.JOIN_GROUP.id, 0.asInstanceOf[Short], "", 1)
     val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1");
-    JoinGroupRequestAndHeader(header, body)
+    JoinGroupRequestAndHeader(0.asInstanceOf[Short], 1, "", body)
   }
 
   def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = {
     import scala.collection.JavaConversions._
-    val header = new ResponseHeader(1)
     val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1)))
-    JoinGroupResponseAndHeader(header, body)
+    JoinGroupResponseAndHeader(1, body)
   }
 }