You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/03/19 14:27:38 UTC

[kafka] branch trunk updated: KAFKA-8618: Replace Txn marker with automated protocol (#7039)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7164a3  KAFKA-8618: Replace Txn marker with automated protocol (#7039)
c7164a3 is described below

commit c7164a3866ab6a28eb5ae158ff29ac806f4f3d54
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Mar 19 07:26:41 2020 -0700

    KAFKA-8618: Replace Txn marker with automated protocol (#7039)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   8 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../common/requests/WriteTxnMarkersRequest.java    | 192 ++++++++-------------
 .../common/requests/WriteTxnMarkersResponse.java   | 164 +++++++-----------
 .../requests/WriteTxnMarkersRequestTest.java       |  81 +++++++++
 .../requests/WriteTxnMarkersResponseTest.java      |  60 +++++++
 6 files changed, 277 insertions(+), 230 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6549d8f..e969170 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -95,6 +95,8 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData;
 import org.apache.kafka.common.message.UpdateMetadataResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -120,8 +122,6 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
-import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -177,8 +177,8 @@ public enum ApiKeys {
     ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(),
             AddOffsetsToTxnResponse.schemaVersions()),
     END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequestData.SCHEMAS, EndTxnResponseData.SCHEMAS),
-    WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequest.schemaVersions(),
-            WriteTxnMarkersResponse.schemaVersions()),
+    WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequestData.SCHEMAS,
+            WriteTxnMarkersResponseData.SCHEMAS),
     TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequestData.SCHEMAS,
                       TxnOffsetCommitResponseData.SCHEMAS),
     DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequestData.SCHEMAS, DescribeAclsResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 9d3f39d..301a4fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -135,7 +135,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
             case END_TXN:
                 return new EndTxnResponse(struct, version);
             case WRITE_TXN_MARKERS:
-                return new WriteTxnMarkersResponse(struct);
+                return new WriteTxnMarkersResponse(struct, version);
             case TXN_OFFSET_COMMIT:
                 return new TxnOffsetCommitResponse(struct, version);
             case DESCRIBE_ACLS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 33f9bb5..c272633 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -17,13 +17,12 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData.WritableTxnMarker;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData.WritableTxnMarkerTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -32,40 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
 public class WriteTxnMarkersRequest extends AbstractRequest {
-    private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
-    private static final String TXN_MARKERS_KEY_NAME = "transaction_markers";
-
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
-    private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
-            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
-            new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."),
-            new Field(TRANSACTION_RESULT_KEY_NAME, BOOLEAN, "The result of the transaction to write to the " +
-                    "partitions (false = ABORT, true = COMMIT)."),
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))), "The partitions to write markers for."),
-            new Field(COORDINATOR_EPOCH_KEY_NAME, INT32, "Epoch associated with the transaction state partition " +
-                    "hosted by this transaction coordinator"));
-
-    private static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
-            new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "The transaction markers to " +
-                    "be written."));
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{WRITE_TXN_MARKERS_REQUEST_V0};
-    }
 
     public static class TxnMarkerEntry {
         private final long producerId;
@@ -106,16 +72,15 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             return partitions;
         }
 
-
         @Override
         public String toString() {
             return "TxnMarkerEntry{" +
-                    "producerId=" + producerId +
-                    ", producerEpoch=" + producerEpoch +
-                    ", coordinatorEpoch=" + coordinatorEpoch +
-                    ", result=" + result +
-                    ", partitions=" + partitions +
-                    '}';
+                       "producerId=" + producerId +
+                       ", producerEpoch=" + producerEpoch +
+                       ", coordinatorEpoch=" + coordinatorEpoch +
+                       ", result=" + result +
+                       ", partitions=" + partitions +
+                       '}';
         }
 
         @Override
@@ -124,10 +89,10 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             if (o == null || getClass() != o.getClass()) return false;
             final TxnMarkerEntry that = (TxnMarkerEntry) o;
             return producerId == that.producerId &&
-                    producerEpoch == that.producerEpoch &&
-                    coordinatorEpoch == that.coordinatorEpoch &&
-                    result == that.result &&
-                    Objects.equals(partitions, that.partitions);
+                       producerEpoch == that.producerEpoch &&
+                       coordinatorEpoch == that.coordinatorEpoch &&
+                       result == that.result &&
+                       Objects.equals(partitions, that.partitions);
         }
 
         @Override
@@ -137,106 +102,93 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
     }
 
     public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
-        private final List<TxnMarkerEntry> markers;
 
-        public Builder(List<TxnMarkerEntry> markers) {
+        public final WriteTxnMarkersRequestData data;
+
+        public Builder(final List<TxnMarkerEntry> markers) {
             super(ApiKeys.WRITE_TXN_MARKERS);
-            this.markers = markers;
+            List<WritableTxnMarker> dataMarkers = new ArrayList<>();
+            for (TxnMarkerEntry marker : markers) {
+                final Map<String, WritableTxnMarkerTopic> topicMap = new HashMap<>();
+                for (TopicPartition topicPartition : marker.partitions) {
+                    WritableTxnMarkerTopic topic = topicMap.getOrDefault(topicPartition.topic(),
+                                                                         new WritableTxnMarkerTopic()
+                                                                             .setName(topicPartition.topic()));
+                    topic.partitionIndexes().add(topicPartition.partition());
+                    topicMap.put(topicPartition.topic(), topic);
+                }
+
+                dataMarkers.add(new WritableTxnMarker()
+                                    .setProducerId(marker.producerId)
+                                    .setProducerEpoch(marker.producerEpoch)
+                                    .setCoordinatorEpoch(marker.coordinatorEpoch)
+                                    .setTransactionResult(marker.transactionResult().id)
+                                    .setTopics(new ArrayList<>(topicMap.values())));
+            }
+            this.data = new WriteTxnMarkersRequestData().setMarkers(dataMarkers);
         }
 
         @Override
         public WriteTxnMarkersRequest build(short version) {
-            return new WriteTxnMarkersRequest(version, markers);
+            return new WriteTxnMarkersRequest(data, version);
         }
     }
 
-    private final List<TxnMarkerEntry> markers;
+    public final WriteTxnMarkersRequestData data;
 
-    private WriteTxnMarkersRequest(short version, List<TxnMarkerEntry> markers) {
+    private WriteTxnMarkersRequest(WriteTxnMarkersRequestData data, short version) {
         super(ApiKeys.WRITE_TXN_MARKERS, version);
-
-        this.markers = markers;
+        this.data = data;
     }
 
     public WriteTxnMarkersRequest(Struct struct, short version) {
         super(ApiKeys.WRITE_TXN_MARKERS, version);
-        List<TxnMarkerEntry> markers = new ArrayList<>();
-        Object[] markersArray = struct.getArray(TXN_MARKERS_KEY_NAME);
-        for (Object markerObj : markersArray) {
-            Struct markerStruct = (Struct) markerObj;
-
-            long producerId = markerStruct.getLong(PRODUCER_ID_KEY_NAME);
-            short producerEpoch = markerStruct.getShort(PRODUCER_EPOCH_KEY_NAME);
-            int coordinatorEpoch = markerStruct.getInt(COORDINATOR_EPOCH_KEY_NAME);
-            TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
-
-            List<TopicPartition> partitions = new ArrayList<>();
-            Object[] topicPartitionsArray = markerStruct.getArray(TOPICS_KEY_NAME);
-            for (Object topicPartitionObj : topicPartitionsArray) {
-                Struct topicPartitionStruct = (Struct) topicPartitionObj;
-                String topic = topicPartitionStruct.get(TOPIC_NAME);
-                for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
-                    partitions.add(new TopicPartition(topic, (Integer) partitionObj));
-                }
-            }
-
-            markers.add(new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, partitions));
-        }
-
-        this.markers = markers;
-    }
-
-
-    public List<TxnMarkerEntry> markers() {
-        return markers;
+        this.data = new WriteTxnMarkersRequestData(struct, version);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.requestSchema(version()));
-
-        Object[] markersArray = new Object[markers.size()];
-        int i = 0;
-        for (TxnMarkerEntry entry : markers) {
-            Struct markerStruct = struct.instance(TXN_MARKERS_KEY_NAME);
-            markerStruct.set(PRODUCER_ID_KEY_NAME, entry.producerId);
-            markerStruct.set(PRODUCER_EPOCH_KEY_NAME, entry.producerEpoch);
-            markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch);
-            markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
-
-            Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupPartitionsByTopic(entry.partitions);
-            Object[] partitionsArray = new Object[mappedPartitions.size()];
-            int j = 0;
-            for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
-                Struct topicPartitionsStruct = markerStruct.instance(TOPICS_KEY_NAME);
-                topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
-                topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
-                partitionsArray[j++] = topicPartitionsStruct;
-            }
-            markerStruct.set(TOPICS_KEY_NAME, partitionsArray);
-            markersArray[i++] = markerStruct;
-        }
-        struct.set(TXN_MARKERS_KEY_NAME, markersArray);
-
-        return struct;
+        return data.toStruct(version());
     }
 
     @Override
     public WriteTxnMarkersResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
 
-        Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(markers.size());
-        for (TxnMarkerEntry entry : markers) {
-            Map<TopicPartition, Errors> errorsPerPartition = new HashMap<>(entry.partitions.size());
-            for (TopicPartition partition : entry.partitions)
-                errorsPerPartition.put(partition, error);
-
-            errors.put(entry.producerId, errorsPerPartition);
+        final Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(data.markers().size());
+        for (WritableTxnMarker markerEntry : data.markers()) {
+            Map<TopicPartition, Errors> errorsPerPartition = new HashMap<>();
+            for (WritableTxnMarkerTopic topic : markerEntry.topics()) {
+                for (Integer partitionIdx : topic.partitionIndexes()) {
+                    errorsPerPartition.put(new TopicPartition(topic.name(), partitionIdx), error);
+                }
+            }
+            errors.put(markerEntry.producerId(), errorsPerPartition);
         }
 
         return new WriteTxnMarkersResponse(errors);
     }
 
+    public List<TxnMarkerEntry> markers() {
+        List<TxnMarkerEntry> markers = new ArrayList<>();
+        for (WritableTxnMarker markerEntry : data.markers()) {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            for (WritableTxnMarkerTopic topic : markerEntry.topics()) {
+                for (Integer partitionIdx : topic.partitionIndexes()) {
+                    topicPartitions.add(new TopicPartition(topic.name(), partitionIdx));
+                }
+            }
+            markers.add(new TxnMarkerEntry(
+                markerEntry.producerId(),
+                markerEntry.producerEpoch(),
+                markerEntry.coordinatorEpoch(),
+                TransactionResult.forId(markerEntry.transactionResult()),
+                topicPartitions)
+            );
+        }
+        return markers;
+    }
+
     public static WriteTxnMarkersRequest parse(ByteBuffer buffer, short version) {
         return new WriteTxnMarkersRequest(ApiKeys.WRITE_TXN_MARKERS.parseRequest(version, buffer), version);
     }
@@ -246,11 +198,11 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final WriteTxnMarkersRequest that = (WriteTxnMarkersRequest) o;
-        return Objects.equals(markers, that.markers);
+        return Objects.equals(this.data, that.data);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(markers);
+        return Objects.hash(this.data);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 92b5fc0..3fde41a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -17,133 +17,87 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData.WritableTxnMarkerResult;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
+/**
+ * Possible error codes:
+ *
+ *   - {@link Errors#CORRUPT_MESSAGE}
+ *   - {@link Errors#INVALID_PRODUCER_EPOCH}
+ *   - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
+ *   - {@link Errors#NOT_LEADER_FOR_PARTITION}
+ *   - {@link Errors#MESSAGE_TOO_LARGE}
+ *   - {@link Errors#RECORD_LIST_TOO_LARGE}
+ *   - {@link Errors#NOT_ENOUGH_REPLICAS}
+ *   - {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND}
+ *   - {@link Errors#INVALID_REQUIRED_ACKS}
+ *   - {@link Errors#TRANSACTION_COORDINATOR_FENCED}
+ *   - {@link Errors#REQUEST_TIMED_OUT}
+ *   - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
+ */
 public class WriteTxnMarkersResponse extends AbstractResponse {
-    private static final String TXN_MARKERS_KEY_NAME = "transaction_markers";
-
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0 = new Schema(
-            PARTITION_ID,
-            ERROR_CODE);
-
-    private static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
-            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0)))),
-                    "Errors per partition from writing markers."));
-
-    private static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema(
-            new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "Errors per partition from " +
-                    "writing markers."));
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{WRITE_TXN_MARKERS_RESPONSE_V0};
-    }
-
-    // Possible error codes:
-    //   CorruptRecord
-    //   InvalidProducerEpoch
-    //   UnknownTopicOrPartition
-    //   NotLeaderForPartition
-    //   MessageTooLarge
-    //   RecordListTooLarge
-    //   NotEnoughReplicas
-    //   NotEnoughReplicasAfterAppend
-    //   InvalidRequiredAcks
-    //   TransactionCoordinatorFenced
-    //   RequestTimeout
-    //   ClusterAuthorizationFailed
 
     private final Map<Long, Map<TopicPartition, Errors>> errors;
+    public final WriteTxnMarkersResponseData data;
 
     public WriteTxnMarkersResponse(Map<Long, Map<TopicPartition, Errors>> errors) {
-        this.errors = errors;
-    }
-
-    public WriteTxnMarkersResponse(Struct struct) {
-        Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>();
-
-        Object[] responseArray = struct.getArray(TXN_MARKERS_KEY_NAME);
-        for (Object responseObj : responseArray) {
-            Struct responseStruct = (Struct) responseObj;
-
-            long producerId = responseStruct.getLong(PRODUCER_ID_KEY_NAME);
-
-            Map<TopicPartition, Errors> errorPerPartition = new HashMap<>();
-            Object[] topicPartitionsArray = responseStruct.getArray(TOPICS_KEY_NAME);
-            for (Object topicPartitionObj : topicPartitionsArray) {
-                Struct topicPartitionStruct = (Struct) topicPartitionObj;
-                String topic = topicPartitionStruct.get(TOPIC_NAME);
-                for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
-                    Struct partitionStruct = (Struct) partitionObj;
-                    Integer partition = partitionStruct.get(PARTITION_ID);
-                    Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
-                    errorPerPartition.put(new TopicPartition(topic, partition), error);
-                }
+        List<WritableTxnMarkerResult> markers = new ArrayList<>();
+        for (Map.Entry<Long, Map<TopicPartition, Errors>> markerEntry : errors.entrySet()) {
+            Map<String, WritableTxnMarkerTopicResult> responseTopicDataMap = new HashMap<>();
+            for (Map.Entry<TopicPartition, Errors> topicEntry : markerEntry.getValue().entrySet()) {
+                TopicPartition topicPartition = topicEntry.getKey();
+                String topicName = topicPartition.topic();
+
+                WritableTxnMarkerTopicResult topic =
+                    responseTopicDataMap.getOrDefault(topicName, new WritableTxnMarkerTopicResult().setName(topicName));
+                topic.partitions().add(new WritableTxnMarkerPartitionResult()
+                                           .setErrorCode(topicEntry.getValue().code())
+                                           .setPartitionIndex(topicPartition.partition())
+                );
+                responseTopicDataMap.put(topicName, topic);
             }
-            errors.put(producerId, errorPerPartition);
+
+            markers.add(new WritableTxnMarkerResult()
+                            .setProducerId(markerEntry.getKey())
+                            .setTopics(new ArrayList<>(responseTopicDataMap.values()))
+            );
         }
 
         this.errors = errors;
+        this.data = new WriteTxnMarkersResponseData()
+                        .setMarkers(markers);
     }
 
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.responseSchema(version));
-
-        Object[] responsesArray = new Object[errors.size()];
-        int k = 0;
-        for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet()) {
-            Struct responseStruct = struct.instance(TXN_MARKERS_KEY_NAME);
-            responseStruct.set(PRODUCER_ID_KEY_NAME, responseEntry.getKey());
-
-            Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue();
-            Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupPartitionDataByTopic(partitionAndErrors);
-            Object[] partitionsArray = new Object[mappedPartitions.size()];
-            int i = 0;
-            for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
-                Struct topicPartitionsStruct = responseStruct.instance(TOPICS_KEY_NAME);
-                topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
-                Map<Integer, Errors> partitionIdAndErrors = topicAndPartitions.getValue();
-
-                Object[] partitionAndErrorsArray = new Object[partitionIdAndErrors.size()];
-                int j = 0;
-                for (Map.Entry<Integer, Errors> partitionAndError : partitionIdAndErrors.entrySet()) {
-                    Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
-                    partitionAndErrorStruct.set(PARTITION_ID, partitionAndError.getKey());
-                    partitionAndErrorStruct.set(ERROR_CODE, partitionAndError.getValue().code());
-                    partitionAndErrorsArray[j++] = partitionAndErrorStruct;
+    public WriteTxnMarkersResponse(Struct struct, short version) {
+        this.data = new WriteTxnMarkersResponseData(struct, version);
+        this.errors = new HashMap<>();
+        for (WritableTxnMarkerResult marker : data.markers()) {
+            Map<TopicPartition, Errors> topicPartitionErrorsMap = new HashMap<>();
+            for (WritableTxnMarkerTopicResult topic: marker.topics()) {
+                for (WritableTxnMarkerPartitionResult partitionResult: topic.partitions()) {
+                    topicPartitionErrorsMap.put(new TopicPartition(topic.name(), partitionResult.partitionIndex()),
+                                                Errors.forCode(partitionResult.errorCode()));
                 }
-                topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
-                partitionsArray[i++] = topicPartitionsStruct;
             }
-            responseStruct.set(TOPICS_KEY_NAME, partitionsArray);
-
-            responsesArray[k++] = responseStruct;
+            errors.put(marker.producerId(), topicPartitionErrorsMap);
         }
+    }
 
-        struct.set(TXN_MARKERS_KEY_NAME, responsesArray);
-        return struct;
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
     }
 
     public Map<TopicPartition, Errors> errors(long producerId) {
@@ -161,6 +115,6 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
     }
 
     public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
-        return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer));
+        return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer), version);
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
new file mode 100644
index 0000000..7273e47
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class WriteTxnMarkersRequestTest {
+
+    private static long producerId = 10L;
+    private static short producerEpoch = 2;
+    private static int coordinatorEpoch = 1;
+    private static TransactionResult result = TransactionResult.COMMIT;
+    private static TopicPartition topicPartition = new TopicPartition("topic", 73);
+
+    protected static int throttleTimeMs = 10;
+
+    private static List<WriteTxnMarkersRequest.TxnMarkerEntry> markers;
+
+    @Before
+    public void setUp() {
+        markers = Collections.singletonList(
+             new WriteTxnMarkersRequest.TxnMarkerEntry(
+                 producerId, producerEpoch, coordinatorEpoch,
+                 result, Collections.singletonList(topicPartition))
+        );
+    }
+
+    @Test
+    public void testConstructor() {
+        WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(markers);
+        for (short version = 0; version <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(); version++) {
+            WriteTxnMarkersRequest request = builder.build(version);
+            assertEquals(1, request.markers().size());
+            WriteTxnMarkersRequest.TxnMarkerEntry marker = request.markers().get(0);
+            assertEquals(producerId, marker.producerId());
+            assertEquals(producerEpoch, marker.producerEpoch());
+            assertEquals(coordinatorEpoch, marker.coordinatorEpoch());
+            assertEquals(result, marker.transactionResult());
+            assertEquals(Collections.singletonList(topicPartition), marker.partitions());
+        }
+    }
+
+    @Test
+    public void testGetErrorResponse() {
+        WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(markers);
+        for (short version = 0; version <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(); version++) {
+            WriteTxnMarkersRequest request = builder.build(version);
+            WriteTxnMarkersResponse errorResponse =
+                request.getErrorResponse(throttleTimeMs, Errors.UNKNOWN_PRODUCER_ID.exception());
+
+            assertEquals(Collections.singletonMap(
+                topicPartition, Errors.UNKNOWN_PRODUCER_ID), errorResponse.errors(producerId));
+            assertEquals(Collections.singletonMap(Errors.UNKNOWN_PRODUCER_ID, 1), errorResponse.errorCounts());
+            // Write txn marker has no throttle time defined in response.
+            assertEquals(0, errorResponse.throttleTimeMs());
+        }
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersResponseTest.java
new file mode 100644
index 0000000..4cec88c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersResponseTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class WriteTxnMarkersResponseTest {
+
+    private static long producerIdOne = 1L;
+    private static long producerIdTwo = 2L;
+
+    private static TopicPartition tp1 = new TopicPartition("topic", 1);
+    private static TopicPartition tp2 = new TopicPartition("topic", 2);
+
+    private static Errors pidOneError = Errors.UNKNOWN_PRODUCER_ID;
+    private static Errors pidTwoError = Errors.INVALID_PRODUCER_EPOCH;
+
+    private static Map<Long, Map<TopicPartition, Errors>> errorMap;
+
+    @Before
+    public void setUp() {
+        errorMap = new HashMap<>();
+        errorMap.put(producerIdOne, Collections.singletonMap(tp1, pidOneError));
+        errorMap.put(producerIdTwo, Collections.singletonMap(tp2, pidTwoError));
+    }
+    @Test
+    public void testConstructorWithStruct() {
+
+        Map<Errors, Integer> expectedErrorCounts = new HashMap<>();
+        expectedErrorCounts.put(Errors.UNKNOWN_PRODUCER_ID, 1);
+        expectedErrorCounts.put(Errors.INVALID_PRODUCER_EPOCH, 1);
+        WriteTxnMarkersResponse response = new WriteTxnMarkersResponse(errorMap);
+        assertEquals(expectedErrorCounts, response.errorCounts());
+        assertEquals(Collections.singletonMap(tp1, pidOneError), response.errors(producerIdOne));
+        assertEquals(Collections.singletonMap(tp2, pidTwoError), response.errors(producerIdTwo));
+    }
+}