You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/11 15:42:54 UTC

[kafka] branch trunk updated: MINOR: Use generated InitProducerId RPC (#6538)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53e95ff  MINOR: Use generated InitProducerId RPC (#6538)
53e95ff is described below

commit 53e95ffcdb1cecbba67eb726aa2abcab3ae49c66
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 11 08:27:08 2019 -0700

    MINOR: Use generated InitProducerId RPC (#6538)
    
    This patch updates the InitProducerId request API to use the generated sources. It also fixes a small bug in the DescribeAclsRequest class where we were using the wrong api key.
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Colin McCabe <cm...@apache.org>
---
 .../kafka/clients/producer/internals/Sender.java   |  8 +-
 .../producer/internals/TransactionManager.java     | 10 ++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  7 +-
 .../kafka/common/requests/AbstractRequest.java     |  2 +
 .../kafka/common/requests/AbstractResponse.java    |  2 +-
 .../kafka/common/requests/CreateTopicsRequest.java |  9 +-
 .../kafka/common/requests/DescribeAclsRequest.java |  4 +-
 .../common/requests/InitProducerIdRequest.java     | 91 +++++++-------------
 .../common/requests/InitProducerIdResponse.java    | 97 +++++-----------------
 .../common/message/InitProducerIdRequest.json      |  2 +-
 .../common/message/InitProducerIdResponse.json     |  2 +-
 .../clients/producer/internals/SenderTest.java     | 25 ++++--
 .../producer/internals/TransactionManagerTest.java | 26 +++---
 .../kafka/common/requests/RequestResponseTest.java | 46 ++++++----
 core/src/main/scala/kafka/server/KafkaApis.scala   | 20 ++---
 .../scala/unit/kafka/server/RequestQuotaTest.scala | 19 +++--
 16 files changed, 160 insertions(+), 210 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6189aae..33bc496 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -480,7 +481,10 @@ public class Sender implements Runnable {
 
     private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
         String nodeId = node.idString();
-        InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
+        InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                .setTransactionalId(null)
+                .setTransactionTimeoutMs(Integer.MAX_VALUE);
+        InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(requestData);
         ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null);
         return NetworkClientUtils.sendAndReceive(client, request, time);
     }
@@ -504,7 +508,7 @@ public class Sender implements Runnable {
                     Errors error = initProducerIdResponse.error();
                     if (error == Errors.NONE) {
                         ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
-                                initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
+                                initProducerIdResponse.data.producerId(), initProducerIdResponse.data.producerEpoch());
                         transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
                         return;
                     } else if (error.exception() instanceof RetriableException) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index b619093..b34cc98 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.RecordBatch;
@@ -254,8 +255,10 @@ public class TransactionManager {
         return handleCachedTransactionRequestResult(() -> {
             transitionTo(State.INITIALIZING);
             setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
-            InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
-            InitProducerIdHandler handler = new InitProducerIdHandler(builder);
+            InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData));
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING);
@@ -1020,7 +1023,8 @@ public class TransactionManager {
             Errors error = initProducerIdResponse.error();
 
             if (error == Errors.NONE) {
-                ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
+                ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data.producerId(),
+                        initProducerIdResponse.data.producerEpoch());
                 setProducerIdAndEpoch(producerIdAndEpoch);
                 transitionTo(State.READY);
                 lastError = null;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index f49c99a..33d6736 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -81,8 +83,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
-import org.apache.kafka.common.requests.InitProducerIdRequest;
-import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.requests.LeaderAndIsrResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
@@ -155,8 +155,7 @@ public enum ApiKeys {
     CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS),
     DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS),
     DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
-    INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(),
-            InitProducerIdResponse.schemaVersions()),
+    INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequestData.SCHEMAS, InitProducerIdResponseData.SCHEMAS),
     OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, OffsetsForLeaderEpochRequest.schemaVersions(),
             OffsetsForLeaderEpochResponse.schemaVersions()),
     ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 239024f..c069bc9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -76,11 +76,13 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
     }
 
     private final short version;
+    public final ApiKeys api;
 
     public AbstractRequest(ApiKeys api, short version) {
         if (!api.isVersionSupported(version))
             throw new UnsupportedVersionException("The " + api + " protocol does not support version " + version);
         this.version = version;
+        this.api = api;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index f594f20..50ae0b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -115,7 +115,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case DELETE_RECORDS:
                 return new DeleteRecordsResponse(struct);
             case INIT_PRODUCER_ID:
-                return new InitProducerIdResponse(struct);
+                return new InitProducerIdResponse(struct, version);
             case OFFSET_FOR_LEADER_EPOCH:
                 return new OffsetsForLeaderEpochResponse(struct);
             case ADD_PARTITIONS_TO_TXN:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 93f7ab2..a2cd17d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -17,12 +17,12 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
@@ -50,7 +50,6 @@ public class CreateTopicsRequest extends AbstractRequest {
     }
 
     private final CreateTopicsRequestData data;
-    private final short version;
 
     public static final int NO_NUM_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
@@ -58,13 +57,11 @@ public class CreateTopicsRequest extends AbstractRequest {
     private CreateTopicsRequest(CreateTopicsRequestData data, short version) {
         super(ApiKeys.CREATE_TOPICS, version);
         this.data = data;
-        this.version = version;
     }
 
     public CreateTopicsRequest(Struct struct, short version) {
         super(ApiKeys.CREATE_TOPICS, version);
         this.data = new CreateTopicsRequestData(struct, version);
-        this.version = version;
     }
 
     public CreateTopicsRequestData data() {
@@ -96,6 +93,6 @@ public class CreateTopicsRequest extends AbstractRequest {
      */
     @Override
     public Struct toStruct() {
-        return data.toStruct(version);
+        return data.toStruct(version());
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 04bfee8..ed417c5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -86,14 +86,14 @@ public class DescribeAclsRequest extends AbstractRequest {
     private final AclBindingFilter filter;
 
     DescribeAclsRequest(AclBindingFilter filter, short version) {
-        super(ApiKeys.DELETE_ACLS, version);
+        super(ApiKeys.DESCRIBE_ACLS, version);
         this.filter = filter;
 
         validate(filter, version);
     }
 
     public DescribeAclsRequest(Struct struct, short version) {
-        super(ApiKeys.DELETE_ACLS, version);
+        super(ApiKeys.DESCRIBE_ACLS, version);
         ResourcePatternFilter resourceFilter = RequestUtils.resourcePatternFilterFromStructFields(struct);
         AccessControlEntryFilter entryFilter = RequestUtils.aceFilterFromStructFields(struct);
         this.filter = new AclBindingFilter(resourceFilter, entryFilter);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
index aab7c72..8351c21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -16,106 +16,71 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class InitProducerIdRequest extends AbstractRequest {
-    public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
-
-    private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
-
-    private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
-            NULLABLE_TRANSACTIONAL_ID,
-            new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema INIT_PRODUCER_ID_REQUEST_V1 = INIT_PRODUCER_ID_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{INIT_PRODUCER_ID_REQUEST_V0, INIT_PRODUCER_ID_REQUEST_V1};
-    }
-
-    private final String transactionalId;
-    private final int transactionTimeoutMs;
-
     public static class Builder extends AbstractRequest.Builder<InitProducerIdRequest> {
-        private final String transactionalId;
-        private final int transactionTimeoutMs;
-
-        public Builder(String transactionalId) {
-            this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
-        }
+        private final InitProducerIdRequestData data;
 
-        public Builder(String transactionalId, int transactionTimeoutMs) {
+        public Builder(InitProducerIdRequestData data) {
             super(ApiKeys.INIT_PRODUCER_ID);
-
-            if (transactionTimeoutMs <= 0)
-                throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
-
-            if (transactionalId != null && transactionalId.isEmpty())
-                throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
-
-            this.transactionalId = transactionalId;
-            this.transactionTimeoutMs = transactionTimeoutMs;
+            this.data = data;
         }
 
         @Override
         public InitProducerIdRequest build(short version) {
-            return new InitProducerIdRequest(version, transactionalId, transactionTimeoutMs);
+            if (data.transactionTimeoutMs() <= 0)
+                throw new IllegalArgumentException("transaction timeout value is not positive: " + data.transactionTimeoutMs());
+
+            if (data.transactionalId() != null && data.transactionalId().isEmpty())
+                throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+
+            return new InitProducerIdRequest(data, version);
         }
 
         @Override
         public String toString() {
-            return "(type=InitProducerIdRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
-                    transactionTimeoutMs + ")";
+            return data.toString();
         }
     }
 
-    public InitProducerIdRequest(Struct struct, short version) {
+    public final InitProducerIdRequestData data;
+
+    private InitProducerIdRequest(InitProducerIdRequestData data, short version) {
         super(ApiKeys.INIT_PRODUCER_ID, version);
-        this.transactionalId = struct.get(NULLABLE_TRANSACTIONAL_ID);
-        this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
+        this.data = data;
     }
 
-    private InitProducerIdRequest(short version, String transactionalId, int transactionTimeoutMs) {
+    public InitProducerIdRequest(Struct struct, short version) {
         super(ApiKeys.INIT_PRODUCER_ID, version);
-        this.transactionalId = transactionalId;
-        this.transactionTimeoutMs = transactionTimeoutMs;
+        this.data = new InitProducerIdRequestData(struct, version);
     }
 
+
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new InitProducerIdResponse(throttleTimeMs, Errors.forException(e));
+        InitProducerIdResponseData response = new InitProducerIdResponseData()
+                .setErrorCode(Errors.forException(e).code())
+                .setProducerId(RecordBatch.NO_PRODUCER_ID)
+                .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
+                .setThrottleTimeMs(0);
+        return new InitProducerIdResponse(response);
     }
 
     public static InitProducerIdRequest parse(ByteBuffer buffer, short version) {
         return new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
     }
 
-    public String transactionalId() {
-        return transactionalId;
-    }
-
-    public int transactionTimeoutMs() {
-        return transactionTimeoutMs;
-    }
-
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
-        struct.set(NULLABLE_TRANSACTIONAL_ID, transactionalId);
-        struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
-        return struct;
+        return data.toStruct(version());
     }
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 9a1e0f7..a33daf3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -16,110 +16,59 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-
+/**
+ * Possible error codes:
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
+ * - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
+ */
 public class InitProducerIdResponse extends AbstractResponse {
-    // Possible error codes:
-    //   NotCoordinator
-    //   CoordinatorNotAvailable
-    //   CoordinatorLoadInProgress
-    //   TransactionalIdAuthorizationFailed
-    //   ClusterAuthorizationFailed
-
-    private static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE,
-            PRODUCER_ID,
-            PRODUCER_EPOCH);
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema INIT_PRODUCER_ID_RESPONSE_V1 = INIT_PRODUCER_ID_RESPONSE_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0, INIT_PRODUCER_ID_RESPONSE_V1};
-    }
-
-    private final int throttleTimeMs;
-    private final Errors error;
-    private final long producerId;
-    private final short epoch;
+    public final InitProducerIdResponseData data;
 
-    public InitProducerIdResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.producerId = producerId;
-        this.epoch = epoch;
+    public InitProducerIdResponse(InitProducerIdResponseData data) {
+        this.data = data;
     }
 
-    public InitProducerIdResponse(Struct struct) {
-        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-        this.error = Errors.forCode(struct.get(ERROR_CODE));
-        this.producerId = struct.get(PRODUCER_ID);
-        this.epoch = struct.get(PRODUCER_EPOCH);
-    }
-
-    public InitProducerIdResponse(int throttleTimeMs, Errors errors) {
-        this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    public InitProducerIdResponse(Struct struct, short version) {
+        this.data = new InitProducerIdResponseData(struct, version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public long producerId() {
-        return producerId;
-    }
-
-    public Errors error() {
-        return error;
+        return data.throttleTimeMs();
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
-    }
-
-    public short epoch() {
-        return epoch;
+        return errorCounts(Errors.forCode(data.errorCode()));
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
-        struct.set(PRODUCER_ID, producerId);
-        struct.set(PRODUCER_EPOCH, epoch);
-        struct.set(ERROR_CODE, error.code());
-        return struct;
+        return data.toStruct(version);
     }
 
     public static InitProducerIdResponse parse(ByteBuffer buffer, short version) {
-        return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
+        return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer), version);
     }
 
     @Override
     public String toString() {
-        return "InitProducerIdResponse(" +
-                "error=" + error +
-                ", producerId=" + producerId +
-                ", producerEpoch=" + epoch +
-                ", throttleTimeMs=" + throttleTimeMs +
-                ')';
+        return data.toString();
+    }
+
+    public Errors error() {
+        return Errors.forCode(data.errorCode());
     }
 
     @Override
diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index 8bf2ce3..c8ca110 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -23,6 +23,6 @@
     { "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+",
       "about": "The transactional id, or null if the producer is not transactional." },
     { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
-      "about": "The time in ms to wait for before aborting idle transactions sent by this producer." }
+      "about": "The time in ms to wait for before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/InitProducerIdResponse.json b/clients/src/main/resources/common/message/InitProducerIdResponse.json
index b251051..a52fc81 100644
--- a/clients/src/main/resources/common/message/InitProducerIdResponse.json
+++ b/clients/src/main/resources/common/message/InitProducerIdResponse.json
@@ -25,7 +25,7 @@
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
     { "name": "ProducerId", "type": "int64", "versions": "0+",
-      "about": "The current producer id." },
+      "default": -1, "about": "The current producer id." },
     { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
       "about": "The current epoch associated with the producer id." }
   ]
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 4cbdaa2..d397fd4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -108,7 +109,6 @@ import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.spy;
 
 public class SenderTest {
-
     private static final int MAX_REQUEST_SIZE = 1024 * 1024;
     private static final short ACKS_ALL = -1;
     private static final String CLIENT_ID = "clientId";
@@ -2321,15 +2321,22 @@ public class SenderTest {
         if (error != Errors.NONE)
             producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
 
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                return body instanceof InitProducerIdRequest && ((InitProducerIdRequest) body).transactionalId() == null;
-            }
-        }, new InitProducerIdResponse(0, error, producerId, producerEpoch));
+        client.prepareResponse(body -> {
+            return body instanceof InitProducerIdRequest &&
+                    ((InitProducerIdRequest) body).data.transactionalId() == null;
+        }, initProducerIdResponse(producerId, producerEpoch, error));
         sender.run(time.milliseconds());
     }
 
+    private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) {
+        InitProducerIdResponseData responseData = new InitProducerIdResponseData()
+                .setErrorCode(error.code())
+                .setProducerEpoch(producerEpoch)
+                .setProducerId(producerId)
+                .setThrottleTimeMs(0);
+        return new InitProducerIdResponse(responseData);
+    }
+
     private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
         transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(Errors.NONE);
@@ -2345,8 +2352,8 @@ public class SenderTest {
         client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
     }
 
-    private void prepareInitPidResponse(Errors error, long pid, short epoch) {
-        client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch));
+    private void prepareInitPidResponse(Errors error, long producerId, short producerEpoch) {
+        client.prepareResponse(initProducerIdResponse(producerId, producerEpoch, error));
     }
 
     private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType)
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 97f7f5d..1c47b9d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -703,8 +704,8 @@ public class TransactionManagerTest {
 
         client.prepareUnsupportedVersionResponse(body -> {
             InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
-            assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
-            assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
+            assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId);
+            assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs);
             return true;
         });
 
@@ -2381,21 +2382,26 @@ public class TransactionManagerTest {
         }, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect);
     }
 
-    private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long pid, short epoch) {
+    private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
+        InitProducerIdResponseData responseData = new InitProducerIdResponseData()
+                .setErrorCode(error.code())
+                .setProducerEpoch(producerEpoch)
+                .setProducerId(producerId)
+                .setThrottleTimeMs(0);
         client.prepareResponse(body -> {
             InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
-            assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
-            assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
+            assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId);
+            assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs);
             return true;
-        }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
+        }, new InitProducerIdResponse(responseData), shouldDisconnect);
     }
 
-    private void sendProduceResponse(Errors error, final long pid, final short epoch) {
-        client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
+    private void sendProduceResponse(Errors error, final long producerId, final short producerEpoch) {
+        client.respond(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0));
     }
 
-    private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
-        client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
+    private void prepareProduceResponse(Errors error, final long producerId, final short producerEpoch) {
+        client.prepareResponse(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0));
     }
     private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
         return body -> {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index dfdc323..ca695c7 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -52,6 +52,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -389,30 +391,39 @@ public class RequestResponseTest {
         checkResponse(req.getErrorResponse(e), req.version());
     }
 
-    private void checkRequest(AbstractRequest req) throws Exception {
+    private void checkRequest(AbstractRequest req) {
         // Check that we can serialize, deserialize and serialize again
         // We don't check for equality or hashCode because it is likely to fail for any request containing a HashMap
         checkRequest(req, false);
     }
 
-    private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) throws Exception {
+    private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) {
         // Check that we can serialize, deserialize and serialize again
         // Check for equality and hashCode only if indicated
-        Struct struct = req.toStruct();
-        AbstractRequest deserialized = (AbstractRequest) deserialize(req, struct, req.version());
-        Struct struct2 = deserialized.toStruct();
-        if (checkEqualityAndHashCode) {
-            assertEquals(struct, struct2);
-            assertEquals(struct.hashCode(), struct2.hashCode());
+        try {
+            Struct struct = req.toStruct();
+            AbstractRequest deserialized = AbstractRequest.parseRequest(req.api, req.version(), struct);
+            Struct struct2 = deserialized.toStruct();
+            if (checkEqualityAndHashCode) {
+                assertEquals(struct, struct2);
+                assertEquals(struct.hashCode(), struct2.hashCode());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to deserialize request " + req + " with type " + req.getClass(), e);
         }
     }
 
     private void checkResponse(AbstractResponse response, int version) throws Exception {
         // Check that we can serialize, deserialize and serialize again
         // We don't check for equality or hashCode because it is likely to fail for any response containing a HashMap
-        Struct struct = response.toStruct((short) version);
-        AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version);
-        Struct struct2 = deserialized.toStruct((short) version);
+        try {
+            Struct struct = response.toStruct((short) version);
+            AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version);
+            Struct struct2 = deserialized.toStruct((short) version);
+            assertEquals(struct2, struct);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to deserialize response " + response + " with type " + response.getClass(), e);
+        }
     }
 
     private AbstractRequestResponse deserialize(AbstractRequestResponse req, Struct struct, short version) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
@@ -1167,14 +1178,21 @@ public class RequestResponseTest {
     }
 
     private InitProducerIdRequest createInitPidRequest() {
-        return new InitProducerIdRequest.Builder(null, 100).build();
+        InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+                .setTransactionalId(null)
+                .setTransactionTimeoutMs(100);
+        return new InitProducerIdRequest.Builder(requestData).build();
     }
 
     private InitProducerIdResponse createInitPidResponse() {
-        return new InitProducerIdResponse(0, Errors.NONE, 3332, (short) 3);
+        InitProducerIdResponseData responseData = new InitProducerIdResponseData()
+                .setErrorCode(Errors.NONE.code())
+                .setProducerEpoch((short) 3)
+                .setProducerId(3332)
+                .setThrottleTimeMs(0);
+        return new InitProducerIdResponse(responseData);
     }
 
-
     private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
         Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7d12fe3..140fdd4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,16 +45,9 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.CreateTopicsResponseData
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
-import org.apache.kafka.common.message.DeleteTopicsResponseData
+import org.apache.kafka.common.message.{CreateTopicsResponseData, DeleteTopicsResponseData, DescribeGroupsResponseData, ElectPreferredLeadersResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData}
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet}
-import org.apache.kafka.common.message.DescribeGroupsResponseData
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
-import org.apache.kafka.common.message.JoinGroupResponseData
-import org.apache.kafka.common.message.LeaveGroupResponseData
-import org.apache.kafka.common.message.SaslAuthenticateResponseData
-import org.apache.kafka.common.message.SaslHandshakeResponseData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -1680,7 +1673,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
     val initProducerIdRequest = request.body[InitProducerIdRequest]
-    val transactionalId = initProducerIdRequest.transactionalId
+    val transactionalId = initProducerIdRequest.data.transactionalId
 
     if (transactionalId != null) {
       if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) {
@@ -1694,13 +1687,18 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     def sendResponseCallback(result: InitProducerIdResult): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
+        val responseData = new InitProducerIdResponseData()
+          .setProducerId(result.producerId)
+          .setProducerEpoch(result.producerEpoch)
+          .setThrottleTimeMs(requestThrottleMs)
+          .setErrorCode(result.error.code)
+        val responseBody = new InitProducerIdResponse(responseData)
         trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
     }
-    txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
+    txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.data.transactionTimeoutMs, sendResponseCallback)
   }
 
   def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d04f39f..05f4bcd 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,13 +24,11 @@ import kafka.security.auth._
 import kafka.utils.TestUtils
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, InitProducerIdRequestData, JoinGroupRequestData, LeaveGroupRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.message.ControlledShutdownRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
-import org.apache.kafka.common.message.SaslAuthenticateRequestData
-import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
@@ -321,7 +319,10 @@ class RequestQuotaTest extends BaseRequestTest {
           new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
 
         case ApiKeys.INIT_PRODUCER_ID =>
-          new InitProducerIdRequest.Builder("abc")
+          val requestData = new InitProducerIdRequestData()
+            .setTransactionalId("test-transactional-id")
+            .setTransactionTimeoutMs(5000)
+          new InitProducerIdRequest.Builder(requestData)
 
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
           new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
@@ -463,7 +464,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.FETCH => FetchResponse.parse(response).throttleTimeMs
       case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
       case ApiKeys.METADATA =>
-        new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
+        new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
       case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
       case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
       case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
@@ -472,15 +473,15 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
       case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
       case ApiKeys.DESCRIBE_GROUPS =>
-        new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
+        new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
       case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
       case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
       case ApiKeys.CREATE_TOPICS =>
-        new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs
+        new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion).throttleTimeMs
       case ApiKeys.DELETE_TOPICS => 
-        new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion()).throttleTimeMs
+        new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion).throttleTimeMs
       case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
-      case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs
+      case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response, ApiKeys.INIT_PRODUCER_ID.latestVersion).throttleTimeMs
       case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
       case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
       case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs