You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/08/07 08:02:54 UTC
[kafka] branch trunk updated: KAFKA-8599: Use automatic RPC
generation in ExpireDelegationToken
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 926fb35 KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken
926fb35 is described below
commit 926fb35d9dcefd45c1e1d276ee7252b15875f23e
Author: Mickael Maison <mi...@gmail.com>
AuthorDate: Wed Aug 7 13:32:26 2019 +0530
KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken
Author: Mickael Maison <mi...@gmail.com>
Reviewers: Manikumar Reddy <ma...@gmail.com>, Viktor Somogyi <vi...@gmail.com>
Closes #7098 from mimaison/KAFKA-8599
---
.../kafka/clients/admin/KafkaAdminClient.java | 8 ++-
.../org/apache/kafka/common/protocol/ApiKeys.java | 6 +-
.../apache/kafka/common/protocol/types/Struct.java | 1 +
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../requests/ExpireDelegationTokenRequest.java | 77 +++++++---------------
.../requests/ExpireDelegationTokenResponse.java | 70 +++++---------------
.../kafka/common/requests/RequestResponseTest.java | 13 +++-
core/src/main/scala/kafka/server/KafkaApis.scala | 7 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 8 ++-
9 files changed, 73 insertions(+), 119 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8092eec..5256e36 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -73,6 +73,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
@@ -2473,8 +2474,11 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs());
+ AbstractRequest.Builder<ExpireDelegationTokenRequest> createRequest(int timeoutMs) {
+ return new ExpireDelegationTokenRequest.Builder(
+ new ExpireDelegationTokenRequestData()
+ .setHmac(hmac)
+ .setExpiryTimePeriodMs(options.expiryTimePeriodMs()));
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 3f0f5e8..e05f692 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -93,8 +95,6 @@ import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
-import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
-import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
@@ -191,7 +191,7 @@ public enum ApiKeys {
CreatePartitionsResponse.schemaVersions()),
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS),
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
- EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
+ EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS, ExpireDelegationTokenResponseData.SCHEMAS),
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS),
ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 3114aea..e47a2cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -290,6 +290,7 @@ public class Struct {
ByteBuffer buf = (ByteBuffer) result;
byte[] arr = new byte[buf.remaining()];
buf.get(arr);
+ buf.flip();
return arr;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index eb52fb8..da2b837 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -151,7 +151,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case RENEW_DELEGATION_TOKEN:
return new RenewDelegationTokenResponse(struct);
case EXPIRE_DELEGATION_TOKEN:
- return new ExpireDelegationTokenResponse(struct);
+ return new ExpireDelegationTokenResponse(struct, version);
case DESCRIBE_DELEGATION_TOKEN:
return new DescribeDelegationTokenResponse(struct);
case DELETE_GROUPS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 5b99676..ca6d2d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -16,102 +16,69 @@
*/
package org.apache.kafka.common.requests;
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import java.nio.ByteBuffer;
-
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
public class ExpireDelegationTokenRequest extends AbstractRequest {
- private static final String HMAC_KEY_NAME = "hmac";
- private static final String EXPIRY_TIME_PERIOD_KEY_NAME = "expiry_time_period";
- private final ByteBuffer hmac;
- private final long expiryTimePeriod;
-
- private static final Schema TOKEN_EXPIRE_REQUEST_V0 = new Schema(
- new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."),
- new Field(EXPIRY_TIME_PERIOD_KEY_NAME, INT64, "expiry time period in milli seconds."));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema TOKEN_EXPIRE_REQUEST_V1 = TOKEN_EXPIRE_REQUEST_V0;
+ private final ExpireDelegationTokenRequestData data;
- private ExpireDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
+ private ExpireDelegationTokenRequest(ExpireDelegationTokenRequestData data, short version) {
super(ApiKeys.EXPIRE_DELEGATION_TOKEN, version);
-
- this.hmac = hmac;
- this.expiryTimePeriod = renewTimePeriod;
+ this.data = data;
}
- public ExpireDelegationTokenRequest(Struct struct, short versionId) {
- super(ApiKeys.EXPIRE_DELEGATION_TOKEN, versionId);
-
- hmac = struct.getBytes(HMAC_KEY_NAME);
- expiryTimePeriod = struct.getLong(EXPIRY_TIME_PERIOD_KEY_NAME);
+ public ExpireDelegationTokenRequest(Struct struct, short version) {
+ super(ApiKeys.EXPIRE_DELEGATION_TOKEN, version);
+ this.data = new ExpireDelegationTokenRequestData(struct, version);
}
public static ExpireDelegationTokenRequest parse(ByteBuffer buffer, short version) {
return new ExpireDelegationTokenRequest(ApiKeys.EXPIRE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
}
- public static Schema[] schemaVersions() {
- return new Schema[] {TOKEN_EXPIRE_REQUEST_V0, TOKEN_EXPIRE_REQUEST_V1};
- }
-
@Override
protected Struct toStruct() {
- short version = version();
- Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.requestSchema(version));
-
- struct.set(HMAC_KEY_NAME, hmac);
- struct.set(EXPIRY_TIME_PERIOD_KEY_NAME, expiryTimePeriod);
-
- return struct;
+ return data.toStruct(version());
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- return new ExpireDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
+ return new ExpireDelegationTokenResponse(
+ new ExpireDelegationTokenResponseData()
+ .setErrorCode(Errors.forException(e).code())
+ .setThrottleTimeMs(throttleTimeMs));
}
public ByteBuffer hmac() {
- return hmac;
+ return ByteBuffer.wrap(data.hmac());
}
public long expiryTimePeriod() {
- return expiryTimePeriod;
+ return data.expiryTimePeriodMs();
}
public static class Builder extends AbstractRequest.Builder<ExpireDelegationTokenRequest> {
- private final ByteBuffer hmac;
- private final long expiryTimePeriod;
+ private final ExpireDelegationTokenRequestData data;
- public Builder(byte[] hmac, long expiryTimePeriod) {
+ public Builder(ExpireDelegationTokenRequestData data) {
super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
- this.hmac = ByteBuffer.wrap(hmac);
- this.expiryTimePeriod = expiryTimePeriod;
+ this.data = data;
}
@Override
public ExpireDelegationTokenRequest build(short version) {
- return new ExpireDelegationTokenRequest(version, hmac, expiryTimePeriod);
+ return new ExpireDelegationTokenRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type: ExpireDelegationTokenRequest").
- append(", hmac=").append(hmac).
- append(", expiryTimePeriod=").append(expiryTimePeriod).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
index 9491a35..16a6e8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -16,92 +16,56 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
public class ExpireDelegationTokenResponse extends AbstractResponse {
- private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp";
-
- private final Errors error;
- private final long expiryTimestamp;
- private final int throttleTimeMs;
+ private final ExpireDelegationTokenResponseData data;
- private static final Schema TOKEN_EXPIRE_RESPONSE_V0 = new Schema(
- ERROR_CODE,
- new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
- THROTTLE_TIME_MS);
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema TOKEN_EXPIRE_RESPONSE_V1 = TOKEN_EXPIRE_RESPONSE_V0;
-
- public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
- this.expiryTimestamp = expiryTimestamp;
- }
-
- public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error) {
- this(throttleTimeMs, error, -1);
+ public ExpireDelegationTokenResponse(ExpireDelegationTokenResponseData data) {
+ this.data = data;
}
- public ExpireDelegationTokenResponse(Struct struct) {
- error = Errors.forCode(struct.get(ERROR_CODE));
- this.expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME);
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ public ExpireDelegationTokenResponse(Struct struct, short version) {
+ this.data = new ExpireDelegationTokenResponseData(struct, version);
}
public static ExpireDelegationTokenResponse parse(ByteBuffer buffer, short version) {
- return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
- }
-
- public static Schema[] schemaVersions() {
- return new Schema[] {TOKEN_EXPIRE_RESPONSE_V0, TOKEN_EXPIRE_RESPONSE_V1};
+ return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer), version);
}
public Errors error() {
- return error;
+ return Errors.forCode(data.errorCode());
}
public long expiryTimestamp() {
- return expiryTimestamp;
+ return data.expiryTimestampMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
+ return Collections.singletonMap(error(), 1);
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version));
-
- struct.set(ERROR_CODE, error.code());
- struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp);
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
- return struct;
+ return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
}
public boolean hasError() {
- return this.error != Errors.NONE;
+ return error() != Errors.NONE;
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 4218eff..0b8d98d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -58,6 +58,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -1551,11 +1553,18 @@ public class RequestResponseTest {
}
private ExpireDelegationTokenRequest createExpireTokenRequest() {
- return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
+ ExpireDelegationTokenRequestData data = new ExpireDelegationTokenRequestData()
+ .setHmac("test".getBytes())
+ .setExpiryTimePeriodMs(System.currentTimeMillis());
+ return new ExpireDelegationTokenRequest.Builder(data).build();
}
private ExpireDelegationTokenResponse createExpireTokenResponse() {
- return new ExpireDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis());
+ ExpireDelegationTokenResponseData data = new ExpireDelegationTokenResponseData()
+ .setThrottleTimeMs(20)
+ .setErrorCode(Errors.NONE.code())
+ .setExpiryTimestampMs(System.currentTimeMillis());
+ return new ExpireDelegationTokenResponse(data);
}
private DescribeDelegationTokenRequest createDescribeTokenRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a88cd92..2b49982 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -59,6 +59,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicR
import org.apache.kafka.common.message.DescribeGroupsResponseData
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData
import org.apache.kafka.common.message.FindCoordinatorResponseData
import org.apache.kafka.common.message.HeartbeatResponseData
import org.apache.kafka.common.message.InitProducerIdResponseData
@@ -2484,7 +2485,11 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending expire token response for correlation id %d to client %s."
.format(request.header.correlationId, request.header.clientId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new ExpireDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp))
+ new ExpireDelegationTokenResponse(
+ new ExpireDelegationTokenResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(error.code)
+ .setExpiryTimestampMs(expiryTimestamp)))
}
if (!allowTokenRequests(request))
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a8d29fd..db7ab69 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
import org.apache.kafka.common.message.DeleteGroupsRequestData
import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.message.DescribeGroupsRequestData
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData
import org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.message.HeartbeatRequestData
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
@@ -444,7 +445,10 @@ class RequestQuotaTest extends BaseRequestTest {
)
case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
- new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
+ new ExpireDelegationTokenRequest.Builder(
+ new ExpireDelegationTokenRequestData()
+ .setHmac("".getBytes)
+ .setExpiryTimePeriodMs(1000L))
case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
@@ -573,7 +577,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response, ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
- case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
+ case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response, ApiKeys.EXPIRE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs