You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/04/11 11:55:35 UTC

[kafka] branch trunk updated: KAFKA-8436: use automated protocol for AddOffsetsToTxn (#7015)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 430e00e  KAFKA-8436: use automated protocol for AddOffsetsToTxn  (#7015)
430e00e is described below

commit 430e00ea95da959d6d8308dd49c4ed1cdffa7914
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sat Apr 11 04:54:53 2020 -0700

    KAFKA-8436: use automated protocol for AddOffsetsToTxn  (#7015)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../producer/internals/TransactionManager.java     | 18 +++--
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  8 +-
 .../kafka/common/requests/AbstractResponse.java    |  2 +-
 .../common/requests/AddOffsetsToTxnRequest.java    | 94 ++++------------------
 .../common/requests/AddOffsetsToTxnResponse.java   | 78 ++++++------------
 .../kafka/clients/producer/KafkaProducerTest.java  |  5 +-
 .../producer/internals/TransactionManagerTest.java | 14 ++--
 .../kafka/common/requests/RequestResponseTest.java | 14 +++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 24 ++++--
 .../kafka/api/AuthorizerIntegrationTest.scala      | 11 ++-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  8 +-
 11 files changed, 116 insertions(+), 160 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c432e50..fa473be 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
 import org.apache.kafka.common.message.EndTxnRequestData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
@@ -395,9 +396,15 @@ public class TransactionManager {
                     "active transaction");
 
         log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, groupMetadata);
-        AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
-                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, groupMetadata.groupId());
+        AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(
+            new AddOffsetsToTxnRequestData()
+                .setTransactionalId(transactionalId)
+                .setProducerId(producerIdAndEpoch.producerId)
+                .setProducerEpoch(producerIdAndEpoch.epoch)
+                .setGroupId(groupMetadata.groupId())
+        );
         AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets, groupMetadata);
+
         enqueueRequest(handler);
         return handler.result;
     }
@@ -1610,13 +1617,14 @@ public class TransactionManager {
         @Override
         public void handleResponse(AbstractResponse response) {
             AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
-            Errors error = addOffsetsToTxnResponse.error();
+            Errors error = Errors.forCode(addOffsetsToTxnResponse.data.errorCode());
 
             if (error == Errors.NONE) {
-                log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId());
+                log.debug("Successfully added partition for consumer group {} to transaction", builder.data.groupId());
 
                 // note the result is not completed until the TxnOffsetCommit returns
                 pendingRequests.add(txnOffsetCommitHandler(result, offsets, groupMetadata));
+
                 transactionStarted = true;
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
@@ -1630,7 +1638,7 @@ public class TransactionManager {
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                 fatalError(error.exception());
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId()));
+                abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
             } else {
                 fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 80d9383..411f337 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.protocol;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
 import org.apache.kafka.common.message.ApiVersionsRequestData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.AlterClientQuotasRequestData;
@@ -106,8 +108,6 @@ import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
-import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
 import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
 import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
@@ -174,8 +174,8 @@ public enum ApiKeys {
             OffsetsForLeaderEpochResponse.schemaVersions()),
     ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,
             AddPartitionsToTxnRequest.schemaVersions(), AddPartitionsToTxnResponse.schemaVersions()),
-    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(),
-            AddOffsetsToTxnResponse.schemaVersions()),
+    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequestData.SCHEMAS,
+                       AddOffsetsToTxnResponseData.SCHEMAS),
     END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequestData.SCHEMAS, EndTxnResponseData.SCHEMAS),
     WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequestData.SCHEMAS,
             WriteTxnMarkersResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 7f2f4bc..dcd5439 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -131,7 +131,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
             case ADD_PARTITIONS_TO_TXN:
                 return new AddPartitionsToTxnResponse(struct);
             case ADD_OFFSETS_TO_TXN:
-                return new AddOffsetsToTxnResponse(struct);
+                return new AddOffsetsToTxnResponse(struct, version);
             case END_TXN:
                 return new EndTxnResponse(struct, version);
             case WRITE_TXN_MARKERS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 2668ae1..3b1c746 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -16,124 +16,60 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-
 public class AddOffsetsToTxnRequest extends AbstractRequest {
-    private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
-            TRANSACTIONAL_ID,
-            PRODUCER_ID,
-            PRODUCER_EPOCH,
-            GROUP_ID);
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V1 = ADD_OFFSETS_TO_TXN_REQUEST_V0;
 
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0, ADD_OFFSETS_TO_TXN_REQUEST_V1};
-    }
+    public AddOffsetsToTxnRequestData data;
 
     public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
-        private final String transactionalId;
-        private final long producerId;
-        private final short producerEpoch;
-        private final String consumerGroupId;
+        public AddOffsetsToTxnRequestData data;
 
-        public Builder(String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
+        public Builder(AddOffsetsToTxnRequestData data) {
             super(ApiKeys.ADD_OFFSETS_TO_TXN);
-            this.transactionalId = transactionalId;
-            this.producerId = producerId;
-            this.producerEpoch = producerEpoch;
-            this.consumerGroupId = consumerGroupId;
-        }
-
-        public String consumerGroupId() {
-            return consumerGroupId;
+            this.data = data;
         }
 
         @Override
         public AddOffsetsToTxnRequest build(short version) {
-            return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId);
+            return new AddOffsetsToTxnRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=AddOffsetsToTxnRequest").
-                    append(", transactionalId=").append(transactionalId).
-                    append(", producerId=").append(producerId).
-                    append(", producerEpoch=").append(producerEpoch).
-                    append(", consumerGroupId=").append(consumerGroupId).
-                    append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private final String transactionalId;
-    private final long producerId;
-    private final short producerEpoch;
-    private final String consumerGroupId;
-
-    private AddOffsetsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
+    public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
         super(ApiKeys.ADD_OFFSETS_TO_TXN, version);
-        this.transactionalId = transactionalId;
-        this.producerId = producerId;
-        this.producerEpoch = producerEpoch;
-        this.consumerGroupId = consumerGroupId;
+        this.data = data;
     }
 
     public AddOffsetsToTxnRequest(Struct struct, short version) {
         super(ApiKeys.ADD_OFFSETS_TO_TXN, version);
-        this.transactionalId = struct.get(TRANSACTIONAL_ID);
-        this.producerId = struct.get(PRODUCER_ID);
-        this.producerEpoch = struct.get(PRODUCER_EPOCH);
-        this.consumerGroupId = struct.get(GROUP_ID);
-    }
-
-    public String transactionalId() {
-        return transactionalId;
-    }
-
-    public long producerId() {
-        return producerId;
-    }
-
-    public short producerEpoch() {
-        return producerEpoch;
-    }
-
-    public String consumerGroupId() {
-        return consumerGroupId;
+        this.data = new AddOffsetsToTxnRequestData(struct, version);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID, transactionalId);
-        struct.set(PRODUCER_ID, producerId);
-        struct.set(PRODUCER_EPOCH, producerEpoch);
-        struct.set(GROUP_ID, consumerGroupId);
-        return struct;
+        return data.toStruct(version());
     }
 
     @Override
     public AddOffsetsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new AddOffsetsToTxnResponse(throttleTimeMs, Errors.forException(e));
+        return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+                                               .setErrorCode(Errors.forException(e).code())
+                                               .setThrottleTimeMs(throttleTimeMs));
     }
 
     public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) {
         return new AddOffsetsToTxnRequest(ApiKeys.ADD_OFFSETS_TO_TXN.parseRequest(version, buffer), version);
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 867ca6a..3747bb9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -16,86 +16,60 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-
+/**
+ * Possible error codes:
+ *
+ *   - {@link Errors#NOT_COORDINATOR}
+ *   - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ *   - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ *   - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
+ *   - {@link Errors#INVALID_PRODUCER_EPOCH}
+ *   - {@link Errors#INVALID_TXN_STATE}
+ *   - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ *   - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
+ */
 public class AddOffsetsToTxnResponse extends AbstractResponse {
-    private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE);
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V1 = ADD_OFFSETS_TO_TXN_RESPONSE_V0;
 
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_OFFSETS_TO_TXN_RESPONSE_V0, ADD_OFFSETS_TO_TXN_RESPONSE_V1};
-    }
-
-    // Possible error codes:
-    //   NotCoordinator
-    //   CoordinatorNotAvailable
-    //   CoordinatorLoadInProgress
-    //   InvalidProducerIdMapping
-    //   InvalidProducerEpoch
-    //   InvalidTxnState
-    //   GroupAuthorizationFailed
-    //   TransactionalIdAuthorizationFailed
+    public AddOffsetsToTxnResponseData data;
 
-    private final Errors error;
-    private final int throttleTimeMs;
-
-    public AddOffsetsToTxnResponse(int throttleTimeMs, Errors error) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
+    public AddOffsetsToTxnResponse(AddOffsetsToTxnResponseData data) {
+        this.data = data;
     }
 
-    public AddOffsetsToTxnResponse(Struct struct) {
-        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-        this.error = Errors.forCode(struct.get(ERROR_CODE));
+    public AddOffsetsToTxnResponse(Struct struct, short version) {
+        this.data = new AddOffsetsToTxnResponseData(struct, version);
     }
 
     @Override
-    public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public Errors error() {
-        return error;
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(Errors.forCode(data.errorCode()));
     }
 
     @Override
-    public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
     }
 
     @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
-        struct.set(ERROR_CODE, error.code());
-        return struct;
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
     }
 
     public static AddOffsetsToTxnResponse parse(ByteBuffer buffer, short version) {
-        return new AddOffsetsToTxnResponse(ApiKeys.ADD_OFFSETS_TO_TXN.parseResponse(version, buffer));
+        return new AddOffsetsToTxnResponse(ApiKeys.ADD_OFFSETS_TO_TXN.parseResponse(version, buffer), version);
     }
 
     @Override
     public String toString() {
-        return "AddOffsetsToTxnResponse(" +
-                "error=" + error +
-                ", throttleTimeMs=" + throttleTimeMs +
-                ')';
+        return data.toString();
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8fec21c..280faa1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -959,7 +960,9 @@ public class KafkaProducerTest {
     }
 
     private AddOffsetsToTxnResponse addOffsetsToTxnResponse(Errors error) {
-        return new AddOffsetsToTxnResponse(10, error);
+        return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+                                               .setErrorCode(error.code())
+                                               .setThrottleTimeMs(10));
     }
 
     private TxnOffsetCommitResponse txnOffsetsCommitResponse(Map<TopicPartition, Errors> errorMap) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index c700605..5650110 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -3258,12 +3259,15 @@ public class TransactionManagerTest {
                                                 final short producerEpoch) {
         client.prepareResponse(body -> {
             AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) body;
-            assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId());
-            assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId());
-            assertEquals(producerId, addOffsetsToTxnRequest.producerId());
-            assertEquals(producerEpoch, addOffsetsToTxnRequest.producerEpoch());
+            assertEquals(consumerGroupId, addOffsetsToTxnRequest.data.groupId());
+            assertEquals(transactionalId, addOffsetsToTxnRequest.data.transactionalId());
+            assertEquals(producerId, addOffsetsToTxnRequest.data.producerId());
+            assertEquals(producerEpoch, addOffsetsToTxnRequest.data.producerEpoch());
             return true;
-        }, new AddOffsetsToTxnResponse(0, error));
+        }, new AddOffsetsToTxnResponse(
+            new AddOffsetsToTxnResponseData()
+                .setErrorCode(error.code()))
+        );
     }
 
     private void prepareTxnOffsetCommitResponse(final String consumerGroupId,
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7a6ce7f..0f9ab8c 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
 import org.apache.kafka.common.message.AlterConfigsResponseData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
@@ -1632,11 +1634,19 @@ public class RequestResponseTest {
     }
 
     private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() {
-        return new AddOffsetsToTxnRequest.Builder("tid", 21L, (short) 42, "gid").build();
+        return new AddOffsetsToTxnRequest.Builder(
+            new AddOffsetsToTxnRequestData()
+                .setTransactionalId("tid")
+                .setProducerId(21L)
+                .setProducerEpoch((short) 42)
+                .setGroupId("gid")
+        ).build();
     }
 
     private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() {
-        return new AddOffsetsToTxnResponse(0, Errors.NONE);
+        return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+                                               .setErrorCode(Errors.NONE.code())
+                                               .setThrottleTimeMs(0));
     }
 
     private EndTxnRequest createEndTxnRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 43ab2c5..57ce87c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
-import org.apache.kafka.common.message.{AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducer [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, Heartb [...]
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
 import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
@@ -2154,20 +2154,28 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = {
     ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest]
-    val transactionalId = addOffsetsToTxnRequest.transactionalId
-    val groupId = addOffsetsToTxnRequest.consumerGroupId
+    val transactionalId = addOffsetsToTxnRequest.data.transactionalId
+    val groupId = addOffsetsToTxnRequest.data.groupId
     val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
     if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+        new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+          .setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code)
+          .setThrottleTimeMs(requestThrottleMs)))
     else if (!authorize(request.context, READ, GROUP, groupId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
+        new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+          .setThrottleTimeMs(requestThrottleMs))
+      )
     else {
       def sendResponseCallback(error: Errors): Unit = {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
-          val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error)
+          val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(
+            new AddOffsetsToTxnResponseData()
+              .setErrorCode(error.code)
+              .setThrottleTimeMs(requestThrottleMs))
           trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId on partition " +
             s"$offsetTopicPartition: errors: $error from client ${request.header.clientId}")
           responseBody
@@ -2176,8 +2184,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
-        addOffsetsToTxnRequest.producerId,
-        addOffsetsToTxnRequest.producerEpoch,
+        addOffsetsToTxnRequest.data.producerId,
+        addOffsetsToTxnRequest.data.producerEpoch,
         Set(offsetTopicPartition),
         sendResponseCallback)
     }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1b57cff..9a7ec14 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
@@ -184,7 +185,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error),
     ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errors(producerId).get(tp)),
     ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)),
-    ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error),
+    ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => Errors.forCode(resp.data.errorCode)),
     ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
     ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)),
     ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => Errors.forCode(resp.results.asScala.head.errorCode)),
@@ -533,7 +534,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build()
 
-  private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
+  private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(
+    new AddOffsetsToTxnRequestData()
+      .setTransactionalId(transactionalId)
+      .setProducerId(1)
+      .setProducerEpoch(1)
+      .setGroupId(group)
+  ).build()
 
   private def electLeadersRequest = new ElectLeadersRequest.Builder(
     ElectionType.PREFERRED,
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 42220a8..c47d511 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,6 +24,7 @@ import kafka.security.authorizer.AclAuthorizer
 import kafka.utils.TestUtils
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
@@ -395,7 +396,12 @@ class RequestQuotaTest extends BaseRequestTest {
           new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, List(tp).asJava)
 
         case ApiKeys.ADD_OFFSETS_TO_TXN =>
-          new AddOffsetsToTxnRequest.Builder("test-transactional-id", 1, 0, "test-txn-group")
+          new AddOffsetsToTxnRequest.Builder(new AddOffsetsToTxnRequestData()
+            .setTransactionalId("test-transactional-id")
+            .setProducerId(1)
+            .setProducerEpoch(0)
+            .setGroupId("test-txn-group")
+          )
 
         case ApiKeys.END_TXN =>
           new EndTxnRequest.Builder(new EndTxnRequestData()