You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/02 00:16:48 UTC
[1/2] kafka git commit: KAFKA-5136: move coordinatorEpoch from
WriteTxnMarkerRequest to TxnMarkerEntry
Repository: kafka
Updated Branches:
refs/heads/trunk 67f1f4d27 -> 94a35fd93
KAFKA-5136: move coordinatorEpoch from WriteTxnMarkerRequest to TxnMarkerEntry
Moving the coordinatorEpoch from WriteTxnMarkerRequest to TxnMarkerEntry will generate fewer broker send requests
Author: Damian Guy <da...@gmail.com>
Reviewers: Ismael Juma, Guozhang Wang
Closes #2925 from dguy/tc-write-txn-request-follow-up
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/324b475e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/324b475e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/324b475e
Branch: refs/heads/trunk
Commit: 324b475eca48502fb16c8efd0de99756f68437bf
Parents: 67f1f4d
Author: Damian Guy <da...@gmail.com>
Authored: Mon May 1 17:15:32 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon May 1 17:15:32 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/common/protocol/Protocol.java | 8 +-
.../common/requests/WriteTxnMarkersRequest.java | 46 +++++-----
.../common/requests/RequestResponseTest.java | 6 +-
.../transaction/TransactionMarkerChannel.scala | 97 ++++++++++++--------
.../TransactionMarkerChannelManager.scala | 19 ++--
...nsactionMarkerRequestCompletionHandler.scala | 19 ++--
.../TransactionMarkerChannelManagerTest.scala | 45 ++++-----
.../TransactionMarkerChannelTest.scala | 38 ++++----
...tionMarkerRequestCompletionHandlerTest.scala | 19 ++--
9 files changed, 149 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 3da2b3f..14471da 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1457,13 +1457,13 @@ public class Protocol {
new ArrayOf(new Schema(
new Field("topic", STRING),
new Field("partitions", new ArrayOf(INT32)))),
- "The partitions to write markers for.")
+ "The partitions to write markers for."),
+ new Field("coordinator_epoch",
+ INT32,
+ "Epoch associated with the transaction state partition hosted by this transaction coordinator")
);
public static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
- new Field("coordinator_epoch",
- INT32,
- "Epoch associated with the transaction state partition hosted by this transaction coordinator."),
new Field("transaction_markers",
new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0),
"The transaction markers to be written.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 7cded24..0c09880 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -43,12 +43,18 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
public static class TxnMarkerEntry {
private final long producerId;
private final short producerEpoch;
+ private final int coordinatorEpoch;
private final TransactionResult result;
private final List<TopicPartition> partitions;
- public TxnMarkerEntry(long producerId, short producerEpoch, TransactionResult result, List<TopicPartition> partitions) {
+ public TxnMarkerEntry(long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result,
+ List<TopicPartition> partitions) {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
+ this.coordinatorEpoch = coordinatorEpoch;
this.result = result;
this.partitions = partitions;
}
@@ -61,6 +67,10 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
return producerEpoch;
}
+ public int coordinatorEpoch() {
+ return coordinatorEpoch;
+ }
+
public TransactionResult transactionResult() {
return result;
}
@@ -73,8 +83,9 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
@Override
public String toString() {
return "TxnMarkerEntry{" +
- "pid=" + producerId +
- ", epoch=" + producerEpoch +
+ "producerId=" + producerId +
+ ", producerEpoch=" + producerEpoch +
+ ", coordinatorEpoch=" + coordinatorEpoch +
", result=" + result +
", partitions=" + partitions +
'}';
@@ -87,47 +98,41 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
final TxnMarkerEntry that = (TxnMarkerEntry) o;
return producerId == that.producerId &&
producerEpoch == that.producerEpoch &&
+ coordinatorEpoch == that.coordinatorEpoch &&
result == that.result &&
Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
- return Objects.hash(producerId, producerEpoch, result, partitions);
+ return Objects.hash(producerId, producerEpoch, coordinatorEpoch, result, partitions);
}
}
public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
- private final int coordinatorEpoch;
private final List<TxnMarkerEntry> markers;
- public Builder(int coordinatorEpoch, List<TxnMarkerEntry> markers) {
+ public Builder(List<TxnMarkerEntry> markers) {
super(ApiKeys.WRITE_TXN_MARKERS);
-
this.markers = markers;
- this.coordinatorEpoch = coordinatorEpoch;
}
@Override
public WriteTxnMarkersRequest build(short version) {
- return new WriteTxnMarkersRequest(version, coordinatorEpoch, markers);
+ return new WriteTxnMarkersRequest(version, markers);
}
}
- private final int coordinatorEpoch;
private final List<TxnMarkerEntry> markers;
- private WriteTxnMarkersRequest(short version, int coordinatorEpoch, List<TxnMarkerEntry> markers) {
+ private WriteTxnMarkersRequest(short version, List<TxnMarkerEntry> markers) {
super(version);
this.markers = markers;
- this.coordinatorEpoch = coordinatorEpoch;
}
public WriteTxnMarkersRequest(Struct struct, short version) {
super(version);
- this.coordinatorEpoch = struct.getInt(COORDINATOR_EPOCH_KEY_NAME);
-
List<TxnMarkerEntry> markers = new ArrayList<>();
Object[] markersArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
for (Object markerObj : markersArray) {
@@ -135,6 +140,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
long producerId = markerStruct.getLong(PID_KEY_NAME);
short producerEpoch = markerStruct.getShort(EPOCH_KEY_NAME);
+ int coordinatorEpoch = markerStruct.getInt(COORDINATOR_EPOCH_KEY_NAME);
TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
List<TopicPartition> partitions = new ArrayList<>();
@@ -147,15 +153,12 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
}
}
- markers.add(new TxnMarkerEntry(producerId, producerEpoch, result, partitions));
+ markers.add(new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, partitions));
}
this.markers = markers;
}
- public int coordinatorEpoch() {
- return coordinatorEpoch;
- }
public List<TxnMarkerEntry> markers() {
return markers;
@@ -164,7 +167,6 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.requestSchema(version()));
- struct.set(COORDINATOR_EPOCH_KEY_NAME, coordinatorEpoch);
Object[] markersArray = new Object[markers.size()];
int i = 0;
@@ -172,6 +174,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
markerStruct.set(PID_KEY_NAME, entry.producerId);
markerStruct.set(EPOCH_KEY_NAME, entry.producerEpoch);
+ markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch);
markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(entry.partitions);
@@ -216,12 +219,11 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final WriteTxnMarkersRequest that = (WriteTxnMarkersRequest) o;
- return coordinatorEpoch == that.coordinatorEpoch &&
- Objects.equals(markers, that.markers);
+ return Objects.equals(markers, that.markers);
}
@Override
public int hashCode() {
- return Objects.hash(coordinatorEpoch, markers);
+ return Objects.hash(markers);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 422f9e6..7c53b54 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -891,9 +891,9 @@ public class RequestResponseTest {
}
private WriteTxnMarkersRequest createWriteTxnMarkersRequest() {
- return new WriteTxnMarkersRequest.Builder(73,
- Collections.singletonList(new WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, TransactionResult.ABORT,
- Collections.singletonList(new TopicPartition("topic", 73))))).build();
+ return new WriteTxnMarkersRequest.Builder(
+ Collections.singletonList(new WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, 73, TransactionResult.ABORT,
+ Collections.singletonList(new TopicPartition("topic", 73))))).build();
}
private WriteTxnMarkersResponse createWriteTxnMarkersResponse() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
index 8eb5a8b..cad3ea5 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.transaction
import java.util
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import kafka.common.{BrokerEndPointNotAvailableException, RequestAndCompletionHandler}
import kafka.server.{DelayedOperationPurgatory, MetadataCache}
@@ -26,22 +26,54 @@ import org.apache.kafka.clients.NetworkClient
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.{concurrent, immutable, mutable}
import collection.JavaConverters._
-import collection.JavaConversions._
class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
metadataCache: MetadataCache,
networkClient: NetworkClient,
time: Time) extends Logging {
- // we need the metadataPartition so we can clean up when Transaction Log partitions emigrate
- case class PendingTxnKey(metadataPartition: Int, producerId: Long)
+ // we need the txnTopicPartition so we can clean up when Transaction Log partitions emigrate
+ case class PendingTxnKey(txnTopicPartition: Int, producerId: Long)
- private val brokerStateMap: concurrent.Map[Int, DestinationBrokerAndQueuedMarkers] = concurrent.TrieMap.empty[Int, DestinationBrokerAndQueuedMarkers]
+ class BrokerRequestQueue(private var destination: Node) {
+
+ // keep track of the requests per txn topic partition so we can easily clear the queue
+ // during partition emigration
+ private val requestsPerTxnTopicPartition: concurrent.Map[Int, BlockingQueue[TxnMarkerEntry]]
+ = concurrent.TrieMap.empty[Int, BlockingQueue[TxnMarkerEntry]]
+
+ def removeRequestsForPartition(partition: Int): Unit = {
+ requestsPerTxnTopicPartition.remove(partition)
+ }
+
+ def maybeUpdateNode(node: Node): Unit = {
+ destination = node
+ }
+
+ def addRequests(txnTopicPartition: Int, txnMarkerEntry: TxnMarkerEntry): Unit = {
+ val queue = requestsPerTxnTopicPartition.getOrElseUpdate(txnTopicPartition, new LinkedBlockingQueue[TxnMarkerEntry]())
+ queue.add(txnMarkerEntry)
+ }
+
+ def eachMetadataPartition[B](f:(Int, BlockingQueue[TxnMarkerEntry]) => B): mutable.Iterable[B] =
+ requestsPerTxnTopicPartition.filter{ case(_, queue) => !queue.isEmpty}
+ .map{case(partition:Int, queue:BlockingQueue[TxnMarkerEntry]) => f(partition, queue)}
+
+
+ def node: Node = destination
+
+ def totalQueuedRequests(): Int =
+ requestsPerTxnTopicPartition.map { case(_, queue) => queue.size()}
+ .sum
+
+ }
+
+ private val brokerStateMap: concurrent.Map[Int, BrokerRequestQueue] = concurrent.TrieMap.empty[Int, BrokerRequestQueue]
private val pendingTxnMap: concurrent.Map[PendingTxnKey, TransactionMetadata] = concurrent.TrieMap.empty[PendingTxnKey, TransactionMetadata]
// TODO: What is reasonable for this
@@ -54,41 +86,28 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
private[transaction]
def drainQueuedTransactionMarkers(txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker]): Iterable[RequestAndCompletionHandler] = {
- brokerStateMap.flatMap {case (brokerId: Int, destAndMarkerQueue: DestinationBrokerAndQueuedMarkers) =>
- val markersToSend: java.util.List[CoordinatorEpochAndMarkers] = new util.ArrayList[CoordinatorEpochAndMarkers] ()
- destAndMarkerQueue.markersQueue.drainTo (markersToSend)
- markersToSend.groupBy{ epochAndMarker => (epochAndMarker.metadataPartition, epochAndMarker.coordinatorEpoch) }
- .map { case((metadataPartition: Int, coordinatorEpoch:Int), buffer: mutable.Buffer[CoordinatorEpochAndMarkers]) =>
- val txnMarkerEntries = buffer.flatMap{_.txnMarkerEntries }.asJava
- val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(
- this,
- txnMarkerPurgatory,
- CoordinatorEpochAndMarkers(metadataPartition, coordinatorEpoch, txnMarkerEntries),
- brokerId)
- RequestAndCompletionHandler(destAndMarkerQueue.destBrokerNode, new WriteTxnMarkersRequest.Builder(coordinatorEpoch, txnMarkerEntries), requestCompletionHandler)
- }
+ brokerStateMap.flatMap {case (brokerId: Int, brokerRequestQueue: BrokerRequestQueue) =>
+ brokerRequestQueue.eachMetadataPartition{ case(partitionId, queue) =>
+ val markersToSend: java.util.List[TxnMarkerEntry] = new util.ArrayList[TxnMarkerEntry]()
+ queue.drainTo(markersToSend)
+ val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(this, txnMarkerPurgatory, partitionId, markersToSend, brokerId)
+ RequestAndCompletionHandler(brokerRequestQueue.node, new WriteTxnMarkersRequest.Builder(markersToSend), requestCompletionHandler)
+ }
}
}
def addOrUpdateBroker(broker: Node) {
- if (brokerStateMap.contains(broker.id())) {
- val brokerQueue = brokerStateMap(broker.id())
- if (!brokerQueue.destBrokerNode.equals(broker)) {
- brokerStateMap.put(broker.id(), DestinationBrokerAndQueuedMarkers(broker, brokerQueue.markersQueue))
- trace(s"Updated destination broker for ${broker.id} from: ${brokerQueue.destBrokerNode} to: $broker")
- }
- } else {
- val markersQueue = new LinkedBlockingQueue[CoordinatorEpochAndMarkers]()
- brokerStateMap.put(broker.id, DestinationBrokerAndQueuedMarkers(broker, markersQueue))
- trace(s"Added destination broker ${broker.id}: $broker")
+ brokerStateMap.putIfAbsent(broker.id(), new BrokerRequestQueue(broker)) match {
+ case Some(brokerQueue) => brokerQueue.maybeUpdateNode(broker)
+ case None => // nothing to do
}
}
- private[transaction] def addRequestForBroker(brokerId: Int, txnMarkerRequest: CoordinatorEpochAndMarkers) {
- val markersQueue = brokerStateMap(brokerId).markersQueue
- markersQueue.add(txnMarkerRequest)
- trace(s"Added markers $txnMarkerRequest for broker $brokerId")
+ private[transaction] def addRequestForBroker(brokerId: Int, metadataPartition: Int, txnMarkerEntry: TxnMarkerEntry) {
+ val brokerQueue = brokerStateMap(brokerId)
+ brokerQueue.addRequests(metadataPartition, txnMarkerEntry)
+ trace(s"Added marker $txnMarkerEntry for broker $brokerId")
}
def addRequestToSend(metadataPartition: Int, pid: Long, epoch: Short, result: TransactionResult, coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = {
@@ -128,8 +147,8 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
}
for ((brokerId: Int, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
- val txnMarker = new TxnMarkerEntry(pid, epoch, result, topicPartitions.toList.asJava)
- addRequestForBroker(brokerId, CoordinatorEpochAndMarkers(metadataPartition, coordinatorEpoch, Utils.mkList(txnMarker)))
+ val txnMarker = new TxnMarkerEntry(pid, epoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
+ addRequestForBroker(brokerId, metadataPartition, txnMarker)
}
networkClient.wakeup()
}
@@ -153,12 +172,10 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
}
def removeStateForPartition(partition: Int): mutable.Iterable[Long] = {
- brokerStateMap.foreach {case(_, destinationAndQueue: DestinationBrokerAndQueuedMarkers) =>
- val allMarkers: java.util.List[CoordinatorEpochAndMarkers] = new util.ArrayList[CoordinatorEpochAndMarkers] ()
- destinationAndQueue.markersQueue.drainTo(allMarkers)
- destinationAndQueue.markersQueue.addAll(allMarkers.asScala.filter{ epochAndMarkers => epochAndMarkers.metadataPartition != partition}.asJava)
+ brokerStateMap.foreach { case(_, brokerQueue) =>
+ brokerQueue.removeRequestsForPartition(partition)
}
- pendingTxnMap.filter { case (key: PendingTxnKey, _) => key.metadataPartition == partition }
+ pendingTxnMap.filter { case (key: PendingTxnKey, _) => key.txnTopicPartition == partition }
.map { case (key: PendingTxnKey, _) =>
pendingTxnMap.remove(key)
key.producerId
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 92c8c72..7121e31 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -16,7 +16,6 @@
*/
package kafka.coordinator.transaction
-import java.util
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
@@ -24,20 +23,14 @@ import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
-import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
+import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.Node
-
-import java.util.concurrent.BlockingQueue
import org.apache.kafka.common.protocol.Errors
import collection.JavaConverters._
-case class CoordinatorEpochAndMarkers(metadataPartition: Int, coordinatorEpoch: Int, txnMarkerEntries: util.List[WriteTxnMarkersRequest.TxnMarkerEntry])
-case class DestinationBrokerAndQueuedMarkers(destBrokerNode: Node, markersQueue: BlockingQueue[CoordinatorEpochAndMarkers])
-
object TransactionMarkerChannelManager {
def apply(config: KafkaConfig,
metrics: Metrics,
@@ -121,10 +114,10 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}
- def addTxnMarkerRequest(coordinatorPartition: Int, metadata: TransactionMetadata, coordinatorEpoch: Int, completionCallback: WriteTxnMarkerCallback): Unit = {
+ def addTxnMarkerRequest(txnTopicPartition: Int, metadata: TransactionMetadata, coordinatorEpoch: Int, completionCallback: WriteTxnMarkerCallback): Unit = {
val metadataToWrite = metadata synchronized metadata.copy()
- if (!transactionMarkerChannel.maybeAddPendingRequest(coordinatorPartition, metadata))
+ if (!transactionMarkerChannel.maybeAddPendingRequest(txnTopicPartition, metadata))
// TODO: Not sure this is the correct response here?
completionCallback(Errors.INVALID_TXN_STATE)
else {
@@ -136,7 +129,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
case PrepareAbort => TransactionResult.ABORT
case s => throw new IllegalStateException("Unexpected txn metadata state while writing markers: " + s)
}
- transactionMarkerChannel.addRequestToSend(coordinatorPartition,
+ transactionMarkerChannel.addRequestToSend(txnTopicPartition,
metadataToWrite.pid,
metadataToWrite.producerEpoch,
result,
@@ -145,8 +138,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}
}
- def removeCompleted(metadataPartition: Int, pid: Long): Unit = {
- transactionMarkerChannel.removeCompletedTxn(metadataPartition, pid)
+ def removeCompleted(txnTopicPartition: Int, pid: Long): Unit = {
+ transactionMarkerChannel.removeCompletedTxn(txnTopicPartition, pid)
}
def removeStateForPartition(transactionStateTopicPartitionId: Int): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index cea9775..5d68325 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -30,20 +30,21 @@ import collection.JavaConversions._
class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel: TransactionMarkerChannel,
txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
- epochAndMarkers: CoordinatorEpochAndMarkers,
+ txnTopicPartition: Int,
+ txnMarkerEntries: java.util.List[TxnMarkerEntry],
brokerId: Int) extends RequestCompletionHandler with Logging {
override def onComplete(response: ClientResponse): Unit = {
val correlationId = response.requestHeader.correlationId
if (response.wasDisconnected) {
trace(s"Cancelled request $response due to node ${response.destination} being disconnected")
// re-enqueue the markers
- for (txnMarker: TxnMarkerEntry <- epochAndMarkers.txnMarkerEntries) {
+ for (txnMarker: TxnMarkerEntry <- txnMarkerEntries) {
transactionMarkerChannel.addRequestToSend(
- epochAndMarkers.metadataPartition,
+ txnTopicPartition,
txnMarker.producerId(),
txnMarker.producerEpoch(),
txnMarker.transactionResult(),
- epochAndMarkers.coordinatorEpoch,
+ txnMarker.coordinatorEpoch(),
txnMarker.partitions().toSet)
}
} else {
@@ -51,7 +52,7 @@ class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel: Transa
val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
- for (txnMarker: TxnMarkerEntry <- epochAndMarkers.txnMarkerEntries) {
+ for (txnMarker: TxnMarkerEntry <- txnMarkerEntries) {
val errors = writeTxnMarkerResponse.errors(txnMarker.producerId())
if (errors == null)
@@ -61,10 +62,10 @@ class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel: Transa
for ((topicPartition: TopicPartition, error: Errors) <- errors) {
error match {
case Errors.NONE =>
- transactionMarkerChannel.pendingTxnMetadata(epochAndMarkers.metadataPartition, txnMarker.producerId()) match {
+ transactionMarkerChannel.pendingTxnMetadata(txnTopicPartition, txnMarker.producerId()) match {
case None =>
// TODO: probably need to respond with Errors.NOT_COORDINATOR
- throw new IllegalArgumentException(s"transaction metadata not found during write txn marker request. partition ${epochAndMarkers.metadataPartition} has likely emigrated")
+ throw new IllegalArgumentException(s"transaction metadata not found during write txn marker request. partition ${txnTopicPartition} has likely emigrated")
case Some(metadata) =>
// do not synchronize on this metadata since it will only be accessed by the sender thread
metadata.topicPartitions -= topicPartition
@@ -79,11 +80,11 @@ class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel: Transa
if (retryPartitions.nonEmpty) {
// re-enqueue with possible new leaders of the partitions
transactionMarkerChannel.addRequestToSend(
- epochAndMarkers.metadataPartition,
+ txnTopicPartition,
txnMarker.producerId(),
txnMarker.producerEpoch(),
txnMarker.transactionResult,
- epochAndMarkers.coordinatorEpoch,
+ txnMarker.coordinatorEpoch(),
retryPartitions.toSet)
}
val completed = txnMarkerPurgatory.checkAndComplete(txnMarker.producerId())
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 352daa2..1c49151 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -79,10 +79,10 @@ class TransactionMarkerChannelManagerTest {
channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1, partition2))
- val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(0,
- Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList(partition1)))).build()
- val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(0,
- Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList(partition2)))).build()
+ val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
+ Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList(partition1)))).build()
+ val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
+ Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList(partition2)))).build()
val requests: Map[Node, WriteTxnMarkersRequest] = requestGenerator().map{ result =>
(result.destination, result.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
@@ -99,38 +99,29 @@ class TransactionMarkerChannelManagerTest {
@Test
def shouldGenerateRequestPerPartitionPerBroker(): Unit ={
- val partitionOneEpoch = 0
- val partitionTwoEpoch = 1
-
EasyMock.expect(metadataCache.getPartitionInfo(partition1.topic(), partition1.partition()))
- .andReturn(Some(PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(1, partitionOneEpoch, List.empty, 0), 0), Set.empty)))
+ .andReturn(Some(PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(1, 0, List.empty, 0), 0), Set.empty)))
EasyMock.expect(metadataCache.getPartitionInfo(partition2.topic(), partition2.partition()))
- .andReturn(Some(PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(1, partitionTwoEpoch, List.empty, 0), 0), Set.empty)))
+ .andReturn(Some(PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(1, 0, List.empty, 0), 0), Set.empty)))
EasyMock.expect(metadataCache.getAliveEndpoint(EasyMock.eq(1), EasyMock.anyObject())).andReturn(Some(broker1)).anyTimes()
EasyMock.replay(metadataCache)
- channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, partitionOneEpoch, Set[TopicPartition](partition1))
- channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, partitionTwoEpoch, Set[TopicPartition](partition2))
+ channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1))
+ channel.addRequestToSend(1, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition2))
- val expectedPartition1Request = new WriteTxnMarkersRequest.Builder(0,
- Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList(partition1)))).build()
- val expectedPartition2Request = new WriteTxnMarkersRequest.Builder(1,
- Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList(partition2)))).build()
+ val expectedPartition1Request = new WriteTxnMarkersRequest.Builder(
+ Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList(partition1)))).build()
+ val expectedPartition2Request = new WriteTxnMarkersRequest.Builder(
+ Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList(partition2)))).build()
val requests = requestGenerator().map { result =>
val markersRequest = result.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()
- (markersRequest.coordinatorEpoch(), (result.destination, markersRequest))
- }.toMap
+ (result.destination, markersRequest)
+ }.toList
- val request1 = requests(partitionOneEpoch)
- val request2 = requests(partitionTwoEpoch)
- assertEquals(broker1, request1._1)
- assertEquals(broker1, request2._1)
- assertEquals(2, requests.size)
- assertEquals(expectedPartition1Request, request1._2)
- assertEquals(expectedPartition2Request, request2._2)
+ assertEquals(List((broker1, expectedPartition1Request), (broker1, expectedPartition2Request)), requests)
}
@Test
@@ -142,8 +133,10 @@ class TransactionMarkerChannelManagerTest {
channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1))
- assertTrue(requestGenerator().nonEmpty)
- assertTrue(requestGenerator().isEmpty)
+ val result = requestGenerator()
+ assertTrue(result.nonEmpty)
+ val result2 = requestGenerator()
+ assertTrue(result2.isEmpty)
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
index 6bfeb9b..89a7606 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
@@ -48,8 +48,8 @@ class TransactionMarkerChannelTest {
def shouldAddEmptyBrokerQueueWhenAddingNewBroker(): Unit = {
channel.addOrUpdateBroker(new Node(1, "host", 10))
channel.addOrUpdateBroker(new Node(2, "host", 10))
- assertEquals(0, channel.queueForBroker(1).get.markersQueue.size())
- assertEquals(0, channel.queueForBroker(2).get.markersQueue.size())
+ assertEquals(0, channel.queueForBroker(1).get.eachMetadataPartition{case(partition:Int, _) => partition}.size)
+ assertEquals(0, channel.queueForBroker(2).get.eachMetadataPartition{case(partition:Int, _) => partition}.size)
}
@Test
@@ -66,21 +66,22 @@ class TransactionMarkerChannelTest {
channel.addOrUpdateBroker(new Node(1, "host", 10))
channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1))
- val destinationAndQueue = channel.queueForBroker(1).get
- assertEquals(newDestination, destinationAndQueue.destBrokerNode)
- assertEquals(1, destinationAndQueue.markersQueue.size())
+ val brokerRequestQueue = channel.queueForBroker(1).get
+ assertEquals(newDestination, brokerRequestQueue.node)
+ assertEquals(1, brokerRequestQueue.totalQueuedRequests())
}
+
@Test
def shouldQueueRequestsByBrokerId(): Unit = {
channel.addOrUpdateBroker(new Node(1, "host", 10))
channel.addOrUpdateBroker(new Node(2, "otherhost", 10))
- channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(0, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList()))))
- channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(0, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList()))))
- channel.addRequestForBroker(2, CoordinatorEpochAndMarkers(0, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList()))))
+ channel.addRequestForBroker(1, 0, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList()))
+ channel.addRequestForBroker(1, 0, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList()))
+ channel.addRequestForBroker(2, 0, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList()))
- assertEquals(2, channel.queueForBroker(1).get.markersQueue.size())
- assertEquals(1, channel.queueForBroker(2).get.markersQueue.size())
+ assertEquals(2, channel.queueForBroker(1).get.totalQueuedRequests())
+ assertEquals(1, channel.queueForBroker(2).get.totalQueuedRequests())
}
@Test
@@ -105,8 +106,8 @@ class TransactionMarkerChannelTest {
EasyMock.replay(metadataCache)
channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1, partition2))
- assertEquals(1, channel.queueForBroker(1).get.markersQueue.size)
- assertEquals(1, channel.queueForBroker(2).get.markersQueue.size)
+ assertEquals(1, channel.queueForBroker(1).get.totalQueuedRequests())
+ assertEquals(1, channel.queueForBroker(2).get.totalQueuedRequests())
}
@Test
def shouldWakeupNetworkClientWhenRequestsQueued(): Unit = {
@@ -131,7 +132,7 @@ class TransactionMarkerChannelTest {
EasyMock.replay(metadataCache)
channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1))
- assertEquals(1, channel.queueForBroker(1).get.markersQueue.size)
+ assertEquals(1, channel.queueForBroker(1).get.totalQueuedRequests())
EasyMock.verify(metadataCache)
}
@@ -161,16 +162,15 @@ class TransactionMarkerChannelTest {
@Test
def shouldRemoveBrokerRequestsForPartitionWhenPartitionEmigrated(): Unit = {
channel.addOrUpdateBroker(new Node(1, "host", 10))
- channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(0, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList()))))
- channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(1, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList()))))
- channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(1, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT, Utils.mkList()))))
+ channel.addRequestForBroker(1, 0, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList()))
+ channel.addRequestForBroker(1, 1, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList()))
+ channel.addRequestForBroker(1, 1, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList()))
channel.removeStateForPartition(1)
- val markersQueue = channel.queueForBroker(1).get.markersQueue
- assertEquals(1, markersQueue.size())
- assertEquals(0, markersQueue.peek().metadataPartition)
+ val result = channel.queueForBroker(1).get.eachMetadataPartition{case (partition:Int, _) => partition}.toList
+ assertEquals(List(0), result)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 45ea2da..096b826 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.transaction
import java.{lang, util}
-import kafka.coordinator.transaction._
import kafka.server.DelayedOperationPurgatory
import kafka.utils.timer.MockTimer
import org.apache.kafka.clients.ClientResponse
@@ -37,19 +36,15 @@ class TransactionMarkerRequestCompletionHandlerTest {
private val markerChannel = EasyMock.createNiceMock(classOf[TransactionMarkerChannel])
private val purgatory = new DelayedOperationPurgatory[DelayedTxnMarker]("txn-purgatory-name", new MockTimer, reaperEnabled = false)
private val topic1 = new TopicPartition("topic1", 0)
- private val epochAndMarkers = CoordinatorEpochAndMarkers(0,
- 0,
+ private val txnMarkers =
Utils.mkList(
- new WriteTxnMarkersRequest.TxnMarkerEntry(0,
- 0,
- TransactionResult.COMMIT,
- Utils.mkList(topic1))))
+ new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList(topic1)))
- private val handler = new TransactionMarkerRequestCompletionHandler(markerChannel, purgatory, epochAndMarkers, 0)
+ private val handler = new TransactionMarkerRequestCompletionHandler(markerChannel, purgatory, 0, txnMarkers, 0)
@Test
def shouldReEnqueuePartitionsWhenBrokerDisconnected(): Unit = {
- EasyMock.expect(markerChannel.addRequestToSend(epochAndMarkers.metadataPartition, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](topic1)))
+ EasyMock.expect(markerChannel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](topic1)))
EasyMock.replay(markerChannel)
handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, true, null, null))
@@ -76,7 +71,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
val metadata = new TransactionMetadata(0, 0, 0, PrepareCommit, mutable.Set[TopicPartition](topic1), 0, 0)
- EasyMock.expect(markerChannel.pendingTxnMetadata(epochAndMarkers.metadataPartition, 0))
+ EasyMock.expect(markerChannel.pendingTxnMetadata(0, 0))
.andReturn(Some(metadata))
EasyMock.replay(markerChannel)
@@ -96,7 +91,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
completed = true
}), Seq(0L))
- EasyMock.expect(markerChannel.pendingTxnMetadata(epochAndMarkers.metadataPartition, 0))
+ EasyMock.expect(markerChannel.pendingTxnMetadata(0, 0))
.andReturn(Some(metadata))
EasyMock.replay(markerChannel)
@@ -144,7 +139,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.UNKNOWN_TOPIC_OR_PARTITION))
val metadata = new TransactionMetadata(0, 0, 0, PrepareCommit, mutable.Set[TopicPartition](topic1), 0, 0)
- EasyMock.expect(markerChannel.addRequestToSend(epochAndMarkers.metadataPartition, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](topic1)))
+ EasyMock.expect(markerChannel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](topic1)))
EasyMock.replay(markerChannel)
handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response))
[2/2] kafka git commit: HOTFIX: fix unit tests for KAFKA-5136
Posted by gu...@apache.org.
HOTFIX: fix unit tests for KAFKA-5136
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94a35fd9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94a35fd9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94a35fd9
Branch: refs/heads/trunk
Commit: 94a35fd93684ea5713777c6b4e1631642de5f377
Parents: 324b475
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon May 1 17:16:42 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon May 1 17:16:42 2017 -0700
----------------------------------------------------------------------
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/94a35fd9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index bca7bb0..5e91c9b 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -266,7 +266,7 @@ class RequestQuotaTest extends BaseRequestTest {
new EndTxnRequest.Builder("txn1", 1, 0, TransactionResult.forId(false))
case ApiKeys.WRITE_TXN_MARKERS =>
- new WriteTxnMarkersRequest.Builder(0, List.empty.asJava)
+ new WriteTxnMarkersRequest.Builder(List.empty.asJava)
case ApiKeys.TXN_OFFSET_COMMIT =>
new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, 3600, Map.empty.asJava)