You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/08/09 07:20:22 UTC
[kafka] branch trunk updated: KAFKA-8598: Use automatic RPC
generation in RenewDelegationToken
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5e543ae KAFKA-8598: Use automatic RPC generation in RenewDelegationToken
5e543ae is described below
commit 5e543ae10832808a9d7efef363603f07695d4368
Author: Mickael Maison <mi...@gmail.com>
AuthorDate: Fri Aug 9 12:49:38 2019 +0530
KAFKA-8598: Use automatic RPC generation in RenewDelegationToken
Author: Mickael Maison <mi...@gmail.com>
Reviewers: Manikumar Reddy <ma...@gmail.com>, Viktor Somogyi <vi...@gmail.com>
Closes #7038 from mimaison/KAFKA-8598
---
.../kafka/clients/admin/KafkaAdminClient.java | 8 ++-
.../org/apache/kafka/common/protocol/ApiKeys.java | 6 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../requests/RenewDelegationTokenRequest.java | 83 ++++++----------------
.../requests/RenewDelegationTokenResponse.java | 70 +++++-------------
.../kafka/common/requests/RequestResponseTest.java | 14 +++-
core/src/main/scala/kafka/server/KafkaApis.scala | 11 ++-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 8 ++-
8 files changed, 75 insertions(+), 127 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5256e36..7bcad41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -82,6 +82,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterC
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -2443,8 +2444,11 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new RenewDelegationTokenRequest.Builder(hmac, options.renewTimePeriodMs());
+ AbstractRequest.Builder<RenewDelegationTokenRequest> createRequest(int timeoutMs) {
+ return new RenewDelegationTokenRequest.Builder(
+ new RenewDelegationTokenRequestData()
+ .setHmac(hmac)
+ .setRenewPeriodMs(options.renewTimePeriodMs()));
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index e05f692..912e26b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -56,6 +56,8 @@ import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -105,8 +107,6 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
-import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.StopReplicaResponse;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@@ -190,7 +190,7 @@ public enum ApiKeys {
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
CreatePartitionsResponse.schemaVersions()),
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS),
- RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
+ RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequestData.SCHEMAS, RenewDelegationTokenResponseData.SCHEMAS),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS, ExpireDelegationTokenResponseData.SCHEMAS),
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index da2b837..13d18a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -149,7 +149,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case CREATE_DELEGATION_TOKEN:
return new CreateDelegationTokenResponse(struct, version);
case RENEW_DELEGATION_TOKEN:
- return new RenewDelegationTokenResponse(struct);
+ return new RenewDelegationTokenResponse(struct, version);
case EXPIRE_DELEGATION_TOKEN:
return new ExpireDelegationTokenResponse(struct, version);
case DESCRIBE_DELEGATION_TOKEN:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index d73561a..2c83f06 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -16,102 +16,65 @@
*/
package org.apache.kafka.common.requests;
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import java.nio.ByteBuffer;
-
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
public class RenewDelegationTokenRequest extends AbstractRequest {
- private static final String HMAC_KEY_NAME = "hmac";
- private static final String RENEW_TIME_PERIOD_KEY_NAME = "renew_time_period";
- private final ByteBuffer hmac;
- private final long renewTimePeriod;
+ private final RenewDelegationTokenRequestData data;
- public static final Schema TOKEN_RENEW_REQUEST_V0 = new Schema(
- new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be renewed."),
- new Field(RENEW_TIME_PERIOD_KEY_NAME, INT64, "Renew time period in milli seconds."));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- public static final Schema TOKEN_RENEW_REQUEST_V1 = TOKEN_RENEW_REQUEST_V0;
-
- private RenewDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
+ public RenewDelegationTokenRequest(RenewDelegationTokenRequestData data, short version) {
super(ApiKeys.RENEW_DELEGATION_TOKEN, version);
-
- this.hmac = hmac;
- this.renewTimePeriod = renewTimePeriod;
+ this.data = data;
}
- public RenewDelegationTokenRequest(Struct struct, short versionId) {
- super(ApiKeys.RENEW_DELEGATION_TOKEN, versionId);
-
- hmac = struct.getBytes(HMAC_KEY_NAME);
- renewTimePeriod = struct.getLong(RENEW_TIME_PERIOD_KEY_NAME);
+ public RenewDelegationTokenRequest(Struct struct, short version) {
+ super(ApiKeys.RENEW_DELEGATION_TOKEN, version);
+ this.data = new RenewDelegationTokenRequestData(struct, version);
}
public static RenewDelegationTokenRequest parse(ByteBuffer buffer, short version) {
return new RenewDelegationTokenRequest(ApiKeys.RENEW_DELEGATION_TOKEN.parseRequest(version, buffer), version);
}
- public static Schema[] schemaVersions() {
- return new Schema[] {TOKEN_RENEW_REQUEST_V0, TOKEN_RENEW_REQUEST_V1};
- }
-
@Override
protected Struct toStruct() {
- short version = version();
- Struct struct = new Struct(ApiKeys.RENEW_DELEGATION_TOKEN.requestSchema(version));
-
- struct.set(HMAC_KEY_NAME, hmac);
- struct.set(RENEW_TIME_PERIOD_KEY_NAME, renewTimePeriod);
+ return data.toStruct(version());
+ }
- return struct;
+ public RenewDelegationTokenRequestData data() {
+ return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- return new RenewDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
- }
-
- public ByteBuffer hmac() {
- return hmac;
- }
-
- public long renewTimePeriod() {
- return renewTimePeriod;
+ return new RenewDelegationTokenResponse(
+ new RenewDelegationTokenResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(Errors.forException(e).code()));
}
public static class Builder extends AbstractRequest.Builder<RenewDelegationTokenRequest> {
- private final ByteBuffer hmac;
- private final long renewTimePeriod;
+ private final RenewDelegationTokenRequestData data;
- public Builder(byte[] hmac, long renewTimePeriod) {
+ public Builder(RenewDelegationTokenRequestData data) {
super(ApiKeys.RENEW_DELEGATION_TOKEN);
- this.hmac = ByteBuffer.wrap(hmac);
- this.renewTimePeriod = renewTimePeriod;
+ this.data = data;
}
@Override
public RenewDelegationTokenRequest build(short version) {
- return new RenewDelegationTokenRequest(version, hmac, renewTimePeriod);
+ return new RenewDelegationTokenRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type: RenewDelegationTokenRequest").
- append(", hmac=").append(hmac).
- append(", renewTimePeriod=").append(renewTimePeriod).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
index dc961e1..35cd615 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -16,92 +16,56 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
public class RenewDelegationTokenResponse extends AbstractResponse {
- private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp";
-
- private final Errors error;
- private final long expiryTimestamp;
- private final int throttleTimeMs;
+ private final RenewDelegationTokenResponseData data;
- private static final Schema TOKEN_RENEW_RESPONSE_V0 = new Schema(
- ERROR_CODE,
- new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
- THROTTLE_TIME_MS);
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema TOKEN_RENEW_RESPONSE_V1 = TOKEN_RENEW_RESPONSE_V0;
-
- public RenewDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
- this.expiryTimestamp = expiryTimestamp;
- }
-
- public RenewDelegationTokenResponse(int throttleTimeMs, Errors error) {
- this(throttleTimeMs, error, -1);
+ public RenewDelegationTokenResponse(RenewDelegationTokenResponseData data) {
+ this.data = data;
}
- public RenewDelegationTokenResponse(Struct struct) {
- error = Errors.forCode(struct.get(ERROR_CODE));
- expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME);
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ public RenewDelegationTokenResponse(Struct struct, short version) {
+ data = new RenewDelegationTokenResponseData(struct, version);
}
public static RenewDelegationTokenResponse parse(ByteBuffer buffer, short version) {
- return new RenewDelegationTokenResponse(ApiKeys.RENEW_DELEGATION_TOKEN.responseSchema(version).read(buffer));
- }
-
- public static Schema[] schemaVersions() {
- return new Schema[] {TOKEN_RENEW_RESPONSE_V0, TOKEN_RENEW_RESPONSE_V1};
+ return new RenewDelegationTokenResponse(ApiKeys.RENEW_DELEGATION_TOKEN.responseSchema(version).read(buffer), version);
}
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
+ return Collections.singletonMap(error(), 1);
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.RENEW_DELEGATION_TOKEN.responseSchema(version));
-
- struct.set(ERROR_CODE, error.code());
- struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp);
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
- return struct;
+ return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
}
public Errors error() {
- return error;
+ return Errors.forCode(data.errorCode());
}
public long expiryTimestamp() {
- return expiryTimestamp;
+ return data.expiryTimestampMs();
}
public boolean hasError() {
- return this.error != Errors.NONE;
+ return error() != Errors.NONE;
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0b8d98d..a979d4c 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -82,6 +82,8 @@ import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -1020,7 +1022,6 @@ public class RequestResponseTest {
return MetadataResponse.prepareResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
}
- @SuppressWarnings("deprecation")
private OffsetCommitRequest createOffsetCommitRequest(int version) {
return new OffsetCommitRequest.Builder(new OffsetCommitRequestData()
.setGroupId("group1")
@@ -1545,11 +1546,18 @@ public class RequestResponseTest {
}
private RenewDelegationTokenRequest createRenewTokenRequest() {
- return new RenewDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
+ RenewDelegationTokenRequestData data = new RenewDelegationTokenRequestData()
+ .setHmac("test".getBytes())
+ .setRenewPeriodMs(System.currentTimeMillis());
+ return new RenewDelegationTokenRequest.Builder(data).build();
}
private RenewDelegationTokenResponse createRenewTokenResponse() {
- return new RenewDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis());
+ RenewDelegationTokenResponseData data = new RenewDelegationTokenResponseData()
+ .setThrottleTimeMs(20)
+ .setErrorCode(Errors.NONE.code())
+ .setExpiryTimestampMs(System.currentTimeMillis());
+ return new RenewDelegationTokenResponse(data);
}
private ExpireDelegationTokenRequest createExpireTokenRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2b49982..b71e4b0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -69,6 +69,7 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.ListGroupsResponseData
import org.apache.kafka.common.message.OffsetCommitRequestData
import org.apache.kafka.common.message.OffsetCommitResponseData
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData
import org.apache.kafka.common.message.SaslAuthenticateResponseData
import org.apache.kafka.common.message.SaslHandshakeResponseData
import org.apache.kafka.common.message.SyncGroupResponseData
@@ -2462,7 +2463,11 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending renew token response %s for correlation id %d to client %s."
.format(request.header.correlationId, request.header.clientId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new RenewDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp))
+ new RenewDelegationTokenResponse(
+ new RenewDelegationTokenResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(error.code)
+ .setExpiryTimestampMs(expiryTimestamp)))
}
if (!allowTokenRequests(request))
@@ -2470,8 +2475,8 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
tokenManager.renewToken(
request.session.principal,
- renewTokenRequest.hmac,
- renewTokenRequest.renewTimePeriod(),
+ ByteBuffer.wrap(renewTokenRequest.data.hmac),
+ renewTokenRequest.data.renewPeriodMs,
sendResponseCallback
)
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index db7ab69..d5c5d7f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -45,6 +45,7 @@ import org.apache.kafka.common.message.ListGroupsRequestData
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData
import org.apache.kafka.common.message.OffsetCommitRequestData
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData
import org.apache.kafka.common.message.SaslAuthenticateRequestData
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.message.SyncGroupRequestData
@@ -454,7 +455,10 @@ class RequestQuotaTest extends BaseRequestTest {
new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
case ApiKeys.RENEW_DELEGATION_TOKEN =>
- new RenewDelegationTokenRequest.Builder("".getBytes, 1000)
+ new RenewDelegationTokenRequest.Builder(
+ new RenewDelegationTokenRequestData()
+ .setHmac("".getBytes)
+ .setRenewPeriodMs(1000L))
case ApiKeys.DELETE_GROUPS =>
new DeleteGroupsRequest.Builder(new DeleteGroupsRequestData()
@@ -577,8 +581,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response, ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
+ case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response, ApiKeys.RENEW_DELEGATION_TOKEN.latestVersion).throttleTimeMs
case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response, ApiKeys.EXPIRE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
- case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs
case ApiKeys.ELECT_LEADERS => new ElectLeadersResponse(response).throttleTimeMs