You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/15 20:15:34 UTC
git commit: KAFKA-1522;
Transactional messaging request/response definitions; reviewed by Joel Koshy
Repository: kafka
Updated Branches:
refs/heads/transactional_messaging 7a67a7226 -> 45c4de00d
KAFKA-1522; Transactional messaging request/response definitions; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45c4de00
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45c4de00
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45c4de00
Branch: refs/heads/transactional_messaging
Commit: 45c4de00daa0c20cb7718e851f65e46074b0c247
Parents: 7a67a72
Author: Dong Lin <li...@gmail.com>
Authored: Fri Aug 15 11:08:55 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Aug 15 11:08:55 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/api/RequestKeys.scala | 6 +-
.../scala/kafka/api/TransactionRequest.scala | 184 +++++++++++++++++++
.../scala/kafka/api/TransactionResponse.scala | 97 ++++++++++
.../api/TxCoordinatorMetadataRequest.scala | 79 ++++++++
.../api/TxCoordinatorMetadataResponse.scala | 59 ++++++
.../main/scala/kafka/common/ErrorMapping.scala | 1 +
.../api/RequestResponseSerializationTest.scala | 31 +++-
7 files changed, 455 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index c24c034..4a9b174 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -34,6 +34,8 @@ object RequestKeys {
val ConsumerMetadataKey: Short = 10
val JoinGroupKey: Short = 11
val HeartbeatKey: Short = 12
+ val TransactionKey: Short = 13
+ val TxCoordinatorMetadataKey: Short = 14
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -48,7 +50,9 @@ object RequestKeys {
OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
- HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+ HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom),
+ TransactionKey -> ("TransactionRequest", TransactionRequest.readFrom),
+ TxCoordinatorMetadataKey -> ("TxCoordinatorMetadata", TxCoordinatorMetadataRequest.readFrom)
)
def nameForKey(key: Short): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/TransactionRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TransactionRequest.scala b/core/src/main/scala/kafka/api/TransactionRequest.scala
new file mode 100644
index 0000000..29df42c
--- /dev/null
+++ b/core/src/main/scala/kafka/api/TransactionRequest.scala
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import kafka.api.ApiUtils._
+import kafka.common._
+import kafka.network.RequestChannel.Response
+import kafka.network.{RequestChannel, BoundedByteBufferSend}
+import collection.mutable.{LinkedHashMap, LinkedHashSet}
+
+
+object TransactionRequest {
+ val CurrentVersion = 0.shortValue
+
+ def readFrom(buffer: ByteBuffer): TransactionRequest = {
+ val versionId: Short = buffer.getShort
+ val correlationId: Int = buffer.getInt
+ val clientId: String = readShortString(buffer)
+ val ackTimeoutMs: Int = buffer.getInt
+ val requestInfo = TransactionRequestInfo.readFrom(buffer)
+
+ TransactionRequest(versionId, correlationId, clientId, ackTimeoutMs, requestInfo)
+ }
+
+ def transactionRequestWithNewControl(oldTxRequest: TransactionRequest, newTxControl: Short): TransactionRequest = {
+ oldTxRequest.copy(requestInfo = oldTxRequest.requestInfo.copy(txControl = newTxControl))
+ }
+}
+
+object TxRequestTypes {
+ val Ongoing: Short = 0
+ val Begin: Short = 1
+ val PreCommit: Short = 2
+ val Commit: Short = 3
+ val Committed: Short = 4
+ val PreAbort: Short = 5
+ val Abort: Short = 6
+ val Aborted: Short = 7
+}
+
+
+case class TransactionRequest(versionId: Short = TransactionRequest.CurrentVersion,
+ correlationId: Int,
+ clientId: String,
+ ackTimeoutMs: Int,
+ requestInfo: TransactionRequestInfo)
+ extends RequestOrResponse(Some(RequestKeys.TransactionKey)) {
+
+ def this(correlationId: Int,
+ clientId: String,
+ ackTimeoutMs: Int,
+ requestInfo: TransactionRequestInfo) =
+ this(TransactionRequest.CurrentVersion, correlationId, clientId, ackTimeoutMs, requestInfo)
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ writeShortString(buffer, clientId)
+ buffer.putInt(ackTimeoutMs)
+ requestInfo.writeTo(buffer)
+ }
+
+ def sizeInBytes: Int = {
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ shortStringLength(clientId) + /* client id */
+ 4 + /* ackTimeoutMs */
+ requestInfo.sizeInBytes
+ }
+
+ override def toString(): String = {
+ describe(true)
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+
+ val transactionResponseStatus = requestInfo.txPartitions.map {
+ topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ }
+ val errorResponse = TransactionResponse(correlationId, requestInfo.txId, transactionResponseStatus.toMap)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
+
+ def responseFor(status: Map[TopicAndPartition, Short]) = {
+ TransactionResponse(correlationId, requestInfo.txId, status);
+ }
+
+ override def describe(details: Boolean): String = {
+ val transactionRequest = new StringBuilder
+ transactionRequest.append("Name: " + this.getClass.getSimpleName)
+ transactionRequest.append("; Version: " + versionId)
+ transactionRequest.append("; CorrelationId: " + correlationId)
+ transactionRequest.append("; ClientId: " + clientId)
+ transactionRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+ if (details)
+ transactionRequest.append("; requestInfos: " + requestInfo)
+ transactionRequest.toString()
+ }
+}
+
+object TransactionRequestInfo {
+
+ def readFrom(buffer: ByteBuffer): TransactionRequestInfo = {
+ val txGroupId: String = readShortString(buffer)
+ val txId: Int = buffer.getInt
+ val txControl: Short = buffer.getShort
+ val txTimeoutMs: Int = buffer.getInt
+
+ val topicCount = buffer.getInt
+ val txPartitions = (1 to topicCount).flatMap(_ => {
+ val topic = readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partition = buffer.getInt
+ TopicAndPartition(topic, partition)
+ })
+ }).toList
+
+ TransactionRequestInfo(txGroupId, txId, txControl, txTimeoutMs, txPartitions)
+ }
+}
+
+case class TransactionRequestInfo(txGroupId: String, txId: Int, txControl: Short, txTimeoutMs: Int,
+ txPartitions: Seq[TopicAndPartition]) {
+
+ private lazy val partitionsGroupedByTopic = txPartitions.groupBy(_.topic)
+
+ def sizeInBytes: Int = {
+ shortStringLength(txGroupId) + /* groupId */
+ 4 + /* txId */
+ 2 + /* txControl */
+ 4 + /* txTimeoutMs */
+ 4 + /* number of topics */
+ partitionsGroupedByTopic.foldLeft(0)((foldedTopics, topicAndPartitions) => {
+ foldedTopics +
+ shortStringLength(topicAndPartitions._1) + /* topic */
+ 4 + /* number of partitions */
+ 4 * topicAndPartitions._2.size /* partitions */
+ })
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ writeShortString(buffer, txGroupId)
+ buffer.putInt(txId)
+ buffer.putShort(txControl)
+ buffer.putInt(txTimeoutMs)
+ buffer.putInt(partitionsGroupedByTopic.size)
+ partitionsGroupedByTopic.foreach {
+ case (topic, topicAndPartitions) =>
+ writeShortString(buffer, topic) //write the topic
+ buffer.putInt(topicAndPartitions.size) //the number of partitions
+ topicAndPartitions.foreach {topicAndPartition: TopicAndPartition =>
+ buffer.putInt(topicAndPartition.partition)
+ }
+ }
+ }
+
+ override def toString(): String = {
+ val requestInfo = new StringBuilder
+ requestInfo.append("gId: " + txGroupId)
+ requestInfo.append("; txId: " + txId)
+ requestInfo.append("; txControl: " + txControl)
+ requestInfo.append("; txTimeoutMs: " + txTimeoutMs)
+ requestInfo.append("; TopicAndPartition: (" + txPartitions.mkString(",") + ")")
+ requestInfo.toString()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/TransactionResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TransactionResponse.scala b/core/src/main/scala/kafka/api/TransactionResponse.scala
new file mode 100644
index 0000000..2a889c7
--- /dev/null
+++ b/core/src/main/scala/kafka/api/TransactionResponse.scala
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import kafka.common.{TopicAndPartition, ErrorMapping}
+
+import scala.collection.Map
+
+object TransactionResponse {
+ def readFrom(buffer: ByteBuffer): TransactionResponse = {
+
+ val correlationId = buffer.getInt
+ val txId = buffer.getInt
+ val topicCount = buffer.getInt
+ val statusPairs = (1 to topicCount).flatMap(_ => {
+ val topic = readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partition = buffer.getInt
+ val error = buffer.getShort
+ (TopicAndPartition(topic, partition), error)
+ })
+ })
+ TransactionResponse(correlationId, txId, Map(statusPairs:_*))
+ }
+}
+
+
+case class TransactionResponse(correlationId: Int,
+ txId: Int,
+ status: Map[TopicAndPartition, Short])
+ extends RequestOrResponse() {
+
+ private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
+
+ def hasError = status.values.exists(_ != ErrorMapping.NoError)
+
+ val sizeInBytes = {
+ val groupedStatus = statusGroupedByTopic
+ 4 + /* correlation id */
+ 4 + /* transaction id */
+ 4 + /* topic count */
+ groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
+ foldedTopics +
+ shortStringLength(currTopic._1) +
+ 4 + /* partition count for this topic */
+ currTopic._2.size * {
+ 4 + /* partition id */
+ 2 /* error code */
+ }
+ })
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ val groupedStatus = statusGroupedByTopic
+ buffer.putInt(correlationId)
+ buffer.putInt(txId)
+ buffer.putInt(groupedStatus.size) // topic count
+
+ groupedStatus.foreach(topicStatus => {
+ val (topic, errors) = topicStatus
+ writeShortString(buffer, topic)
+ buffer.putInt(errors.size) // partition count
+ errors.foreach {
+ case (TopicAndPartition(_, partition), error) =>
+ buffer.putInt(partition)
+ buffer.putShort(error)
+ }
+ })
+ }
+
+ override def toString(): String = {
+ val requestInfo = new StringBuilder
+ requestInfo.append("txId: " + txId)
+ requestInfo.append("; status: " + status)
+ requestInfo.toString()
+ }
+
+ override def describe(details: Boolean):String = { toString }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala
new file mode 100644
index 0000000..a154c57
--- /dev/null
+++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+import kafka.common.ErrorMapping
+
+object TxCoordinatorMetadataRequest {
+ val CurrentVersion = 0.shortValue
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer) = {
+ // envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = ApiUtils.readShortString(buffer)
+
+ // request
+ val txGroupId = ApiUtils.readShortString(buffer)
+ TxCoordinatorMetadataRequest(txGroupId, versionId, correlationId, clientId)
+ }
+
+}
+
+case class TxCoordinatorMetadataRequest(txGroupId: String,
+ versionId: Short = TxCoordinatorMetadataRequest.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = TxCoordinatorMetadataRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.TxCoordinatorMetadataKey)) {
+
+ def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ ApiUtils.shortStringLength(clientId) +
+ ApiUtils.shortStringLength(txGroupId)
+
+ def writeTo(buffer: ByteBuffer) {
+ // envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ ApiUtils.writeShortString(buffer, clientId)
+
+ // transaction coordinator metadata request
+ ApiUtils.writeShortString(buffer, txGroupId)
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ // return TransactionCoordinatorNotAvailable for all uncaught errors
+ val errorResponse = TxCoordinatorMetadataResponse(None, ErrorMapping.TxCoordinatorNotAvailableCode)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
+
+ def describe(details: Boolean) = {
+ val transactionMetadataRequest = new StringBuilder
+ transactionMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+ transactionMetadataRequest.append("; Version: " + versionId)
+ transactionMetadataRequest.append("; CorrelationId: " + correlationId)
+ transactionMetadataRequest.append("; ClientId: " + clientId)
+ transactionMetadataRequest.append("; Group: " + txGroupId)
+ transactionMetadataRequest.toString()
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala
new file mode 100644
index 0000000..052b29b
--- /dev/null
+++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+
+object TxCoordinatorMetadataResponse {
+ val CurrentVersion = 0
+
+ private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1))
+
+ def readFrom(buffer: ByteBuffer) = {
+ val correlationId = buffer.getInt
+ val errorCode = buffer.getShort
+ val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+ Some(Broker.readFrom(buffer))
+ else
+ None
+
+ TxCoordinatorMetadataResponse(coordinatorOpt, errorCode, correlationId)
+ }
+
+}
+
+case class TxCoordinatorMetadataResponse(coordinatorOpt: Option[Broker],
+ errorCode: Short,
+ correlationId: Int = 0)
+ extends RequestOrResponse() {
+
+ def sizeInBytes =
+ 4 + /* correlationId */
+ 2 + /* error code */
+ coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).get.sizeInBytes
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(correlationId)
+ buffer.putShort(errorCode)
+ coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer))
+ }
+
+ def describe(details: Boolean) = toString
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 5559d26..b185421 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -46,6 +46,7 @@ object ErrorMapping {
val OffsetsLoadInProgressCode: Short = 14
val ConsumerCoordinatorNotAvailableCode: Short = 15
val NotCoordinatorForConsumerCode: Short = 16
+ val TxCoordinatorNotAvailableCode: Short = 17
private val exceptionToCode =
Map[Class[Throwable], Short](
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cd16ced..e3bb7a6 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -216,6 +217,27 @@ object SerializationTestUtils {
val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1)))
JoinGroupResponseAndHeader(1, body)
}
+
+ def createTestTransactionRequest: TransactionRequest = {
+ new TransactionRequest(1, "client 1", 1000,
+ TransactionRequestInfo("group 1", 1, 1, 1000, Seq(
+ TopicAndPartition(topic1, 0), TopicAndPartition(topic2, 0)
+ )))
+ }
+
+ def createTestTransactionResponse: TransactionResponse = {
+ val responseMap = Map((TopicAndPartition(topic1, 0), ErrorMapping.NoError),
+ (TopicAndPartition(topic2, 0), ErrorMapping.NoError))
+ TransactionResponse(1, 1,responseMap)
+ }
+
+ def createTestTxCoordinatorMetadataRequest: TxCoordinatorMetadataRequest = {
+ TxCoordinatorMetadataRequest("txGroup 1", clientId = "client 1")
+ }
+
+ def createTestTxCoordinatorMetadataResponse: TxCoordinatorMetadataResponse = {
+ TxCoordinatorMetadataResponse(Some(brokers.head), ErrorMapping.NoError)
+ }
}
class RequestResponseSerializationTest extends JUnitSuite {
@@ -242,6 +264,10 @@ class RequestResponseSerializationTest extends JUnitSuite {
private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader
private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader
private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader
+ private val transactionRequest = SerializationTestUtils.createTestTransactionRequest
+ private val transactionResponse = SerializationTestUtils.createTestTransactionResponse
+ private val txCoordinatorMetadataRequest = SerializationTestUtils.createTestTxCoordinatorMetadataRequest
+ private val txCoordinatorMetadataResponse = SerializationTestUtils.createTestTxCoordinatorMetadataResponse
@Test
def testSerializationAndDeserialization() {
@@ -254,7 +280,10 @@ class RequestResponseSerializationTest extends JUnitSuite {
offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
consumerMetadataRequest, consumerMetadataResponse,
consumerMetadataResponseNoCoordinator, heartbeatRequest,
- heartbeatResponse, joinGroupRequest, joinGroupResponse)
+ heartbeatResponse, joinGroupRequest, joinGroupResponse,
+ transactionRequest, transactionResponse,
+ txCoordinatorMetadataRequest, txCoordinatorMetadataResponse)
+
requestsAndResponses.foreach { original =>
val buffer = ByteBuffer.allocate(original.sizeInBytes)