You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/06/25 13:01:28 UTC

[kafka] branch trunk updated: KAFKA-8390: Use automatic RPC generation in CreateDelegationToken (#6828)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 14d8549  KAFKA-8390: Use automatic RPC generation in CreateDelegationToken (#6828)
14d8549 is described below

commit 14d854936e1d2fed2e69a7c6367becf360f88833
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Tue Jun 25 15:01:11 2019 +0200

    KAFKA-8390: Use automatic RPC generation in CreateDelegationToken (#6828)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  24 ++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../requests/CreateDelegationTokenRequest.java     |  94 ++----------
 .../requests/CreateDelegationTokenResponse.java    | 164 ++++++---------------
 .../kafka/common/requests/RequestResponseTest.java |  30 +++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  15 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  11 +-
 8 files changed, 122 insertions(+), 224 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e01ecf3..3fde51d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -61,6 +61,9 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
@@ -138,6 +141,7 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -2389,12 +2393,21 @@ public class KafkaAdminClient extends AdminClient {
     public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) {
         final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
+        List<CreatableRenewers> renewers = new ArrayList<>();
+        for (KafkaPrincipal principal : options.renewers()) {
+            renewers.add(new CreatableRenewers()
+                    .setPrincipalName(principal.getName())
+                    .setPrincipalType(principal.getPrincipalType()));
+        }
         runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
             new LeastLoadedNodeProvider()) {
 
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs());
+            AbstractRequest.Builder<CreateDelegationTokenRequest> createRequest(int timeoutMs) {
+                return new CreateDelegationTokenRequest.Builder(
+                        new CreateDelegationTokenRequestData()
+                            .setRenewers(renewers)
+                            .setMaxLifetimeMs(options.maxlifeTimeMs()));
             }
 
             @Override
@@ -2403,9 +2416,10 @@ public class KafkaAdminClient extends AdminClient {
                 if (response.hasError()) {
                     delegationTokenFuture.completeExceptionally(response.error().exception());
                 } else {
-                    TokenInformation tokenInfo =  new TokenInformation(response.tokenId(), response.owner(),
-                        options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp());
-                    DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes());
+                    CreateDelegationTokenResponseData data = response.data();
+                    TokenInformation tokenInfo =  new TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), data.principalName()),
+                        options.renewers(), data.issueTimestampMs(), data.maxTimestampMs(), data.expiryTimestampMs());
+                    DelegationToken token = new DelegationToken(tokenInfo, data.hmac());
                     delegationTokenFuture.complete(token);
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 849f268..257393b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
@@ -65,8 +67,6 @@ import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
-import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
@@ -185,7 +185,7 @@ public enum ApiKeys {
             SaslAuthenticateResponseData.SCHEMAS),
     CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
             CreatePartitionsResponse.schemaVersions()),
-    CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()),
+    CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS),
     RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
     EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
     DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 6d07431..8a7be33 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -147,7 +147,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case CREATE_PARTITIONS:
                 return new CreatePartitionsResponse(struct);
             case CREATE_DELEGATION_TOKEN:
-                return new CreateDelegationTokenResponse(struct);
+                return new CreateDelegationTokenResponse(struct, version);
             case RENEW_DELEGATION_TOKEN:
                 return new RenewDelegationTokenResponse(struct);
             case EXPIRE_DELEGATION_TOKEN:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
index 3277f10..61b404a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
@@ -16,126 +16,62 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
 
 public class CreateDelegationTokenRequest extends AbstractRequest {
-    private static final String RENEWERS_KEY_NAME = "renewers";
-    private static final String MAX_LIFE_TIME_KEY_NAME = "max_life_time";
-
-    private static final Schema TOKEN_CREATE_REQUEST_V0 = new Schema(
-            new Field(RENEWERS_KEY_NAME, new ArrayOf(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)),
-                    "An array of token renewers. Renewer is an Kafka PrincipalType and name string," +
-                        " who is allowed to renew this token before the max lifetime expires."),
-            new Field(MAX_LIFE_TIME_KEY_NAME, INT64,
-                    "Max lifetime period for token in milli seconds. if value is -1, then max lifetime" +
-                        "  will default to a server side config value."));
 
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema TOKEN_CREATE_REQUEST_V1 = TOKEN_CREATE_REQUEST_V0;
+    private final CreateDelegationTokenRequestData data;
 
-    private final List<KafkaPrincipal> renewers;
-    private final long maxLifeTime;
-
-    private CreateDelegationTokenRequest(short version, List<KafkaPrincipal> renewers, long maxLifeTime) {
+    private CreateDelegationTokenRequest(CreateDelegationTokenRequestData data, short version) {
         super(ApiKeys.CREATE_DELEGATION_TOKEN, version);
-        this.maxLifeTime = maxLifeTime;
-        this.renewers = renewers;
+        this.data = data;
     }
 
     public CreateDelegationTokenRequest(Struct struct, short version) {
         super(ApiKeys.CREATE_DELEGATION_TOKEN, version);
-        maxLifeTime = struct.getLong(MAX_LIFE_TIME_KEY_NAME);
-        Object[] renewerArray = struct.getArray(RENEWERS_KEY_NAME);
-        renewers = new ArrayList<>();
-        if (renewerArray != null) {
-            for (Object renewerObj : renewerArray) {
-                Struct renewerObjStruct = (Struct) renewerObj;
-                String principalType = renewerObjStruct.get(PRINCIPAL_TYPE);
-                String principalName = renewerObjStruct.get(PRINCIPAL_NAME);
-                renewers.add(new KafkaPrincipal(principalType, principalName));
-            }
-        }
+        this.data = new CreateDelegationTokenRequestData(struct, version);
     }
 
     public static CreateDelegationTokenRequest parse(ByteBuffer buffer, short version) {
         return new CreateDelegationTokenRequest(ApiKeys.CREATE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
     }
 
-    public static Schema[] schemaVersions() {
-        return new Schema[]{TOKEN_CREATE_REQUEST_V0, TOKEN_CREATE_REQUEST_V1};
-    }
-
     @Override
     protected Struct toStruct() {
-        short version = version();
-        Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.requestSchema(version));
-        Object[] renewersArray = new Object[renewers.size()];
-
-        int i = 0;
-        for (KafkaPrincipal principal: renewers) {
-            Struct renewerStruct = struct.instance(RENEWERS_KEY_NAME);
-            renewerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType());
-            renewerStruct.set(PRINCIPAL_NAME, principal.getName());
-            renewersArray[i++] = renewerStruct;
-        }
+        return data.toStruct(version());
+    }
 
-        struct.set(RENEWERS_KEY_NAME, renewersArray);
-        struct.set(MAX_LIFE_TIME_KEY_NAME, maxLifeTime);
-        return struct;
+    public CreateDelegationTokenRequestData data() {
+        return data;
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new CreateDelegationTokenResponse(throttleTimeMs, Errors.forException(e), KafkaPrincipal.ANONYMOUS);
-    }
-
-    public List<KafkaPrincipal> renewers() {
-        return renewers;
-    }
-
-    public long maxLifeTime() {
-        return maxLifeTime;
+        return CreateDelegationTokenResponse.prepareResponse(throttleTimeMs, Errors.forException(e), KafkaPrincipal.ANONYMOUS);
     }
 
     public static class Builder extends AbstractRequest.Builder<CreateDelegationTokenRequest> {
-        private final List<KafkaPrincipal> renewers;
-        private final long maxLifeTime;
+        private final CreateDelegationTokenRequestData data;
 
-        public Builder(List<KafkaPrincipal> renewers, long maxLifeTime) {
+        public Builder(CreateDelegationTokenRequestData data) {
             super(ApiKeys.CREATE_DELEGATION_TOKEN);
-            this.renewers = renewers;
-            this.maxLifeTime = maxLifeTime;
+            this.data = data;
         }
 
         @Override
         public CreateDelegationTokenRequest build(short version) {
-            return new CreateDelegationTokenRequest(version, renewers, maxLifeTime);
+            return new CreateDelegationTokenRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type: CreateDelegationTokenRequest").
-                append(", renewers=").append(renewers).
-                append(", maxLifeTime=").append(maxLifeTime).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
index 8bb6631..04db17a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -16,160 +16,82 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
 public class CreateDelegationTokenResponse extends AbstractResponse {
 
-    private static final String OWNER_KEY_NAME = "owner";
-    private static final String ISSUE_TIMESTAMP_KEY_NAME = "issue_timestamp";
-    private static final String EXPIRY_TIMESTAMP_NAME = "expiry_timestamp";
-    private static final String MAX_TIMESTAMP_NAME = "max_timestamp";
-    private static final String TOKEN_ID_KEY_NAME = "token_id";
-    private static final String HMAC_KEY_NAME = "hmac";
-
-    private final Errors error;
-    private final long issueTimestamp;
-    private final long expiryTimestamp;
-    private final long maxTimestamp;
-    private final String tokenId;
-    private final ByteBuffer hmac;
-    private final int throttleTimeMs;
-    private KafkaPrincipal owner;
-
-    private static final Schema TOKEN_CREATE_RESPONSE_V0 = new Schema(
-        ERROR_CODE,
-        new Field(OWNER_KEY_NAME, new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME), "token owner."),
-        new Field(ISSUE_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) when this token was generated."),
-        new Field(EXPIRY_TIMESTAMP_NAME, INT64, "timestamp (in msec) at which this token expires."),
-        new Field(MAX_TIMESTAMP_NAME, INT64, "max life time of this token."),
-        new Field(TOKEN_ID_KEY_NAME, STRING, "UUID to ensure uniqueness."),
-        new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token."),
-        THROTTLE_TIME_MS);
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema TOKEN_CREATE_RESPONSE_V1 = TOKEN_CREATE_RESPONSE_V0;
-
-    public CreateDelegationTokenResponse(int throttleTimeMs,
-                                         Errors error,
-                                         KafkaPrincipal owner,
-                                         long issueTimestamp,
-                                         long expiryTimestamp,
-                                         long maxTimestamp,
-                                         String tokenId,
-                                         ByteBuffer hmac) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.owner = owner;
-        this.issueTimestamp = issueTimestamp;
-        this.expiryTimestamp = expiryTimestamp;
-        this.maxTimestamp = maxTimestamp;
-        this.tokenId = tokenId;
-        this.hmac = hmac;
-    }
+    private final CreateDelegationTokenResponseData data;
 
-    public CreateDelegationTokenResponse(int throttleTimeMs, Errors error, KafkaPrincipal owner) {
-        this(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
+    public CreateDelegationTokenResponse(CreateDelegationTokenResponseData data) {
+        this.data = data;
     }
 
-    public CreateDelegationTokenResponse(Struct struct) {
-        error = Errors.forCode(struct.get(ERROR_CODE));
-        Struct ownerStruct = (Struct) struct.get(OWNER_KEY_NAME);
-        String principalType = ownerStruct.get(PRINCIPAL_TYPE);
-        String principalName = ownerStruct.get(PRINCIPAL_NAME);
-        owner = new KafkaPrincipal(principalType, principalName);
-        issueTimestamp = struct.getLong(ISSUE_TIMESTAMP_KEY_NAME);
-        expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_NAME);
-        maxTimestamp = struct.getLong(MAX_TIMESTAMP_NAME);
-        tokenId = struct.getString(TOKEN_ID_KEY_NAME);
-        hmac = struct.getBytes(HMAC_KEY_NAME);
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+    public CreateDelegationTokenResponse(Struct struct, short version) {
+        this.data = new CreateDelegationTokenResponseData(struct, version);
     }
 
     public static CreateDelegationTokenResponse parse(ByteBuffer buffer, short version) {
-        return new CreateDelegationTokenResponse(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
-    }
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {TOKEN_CREATE_RESPONSE_V0, TOKEN_CREATE_RESPONSE_V1};
-    }
-
-    @Override
-    public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
-    }
-
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version));
-        struct.set(ERROR_CODE, error.code());
-        Struct ownerStruct = struct.instance(OWNER_KEY_NAME);
-        ownerStruct.set(PRINCIPAL_TYPE, owner.getPrincipalType());
-        ownerStruct.set(PRINCIPAL_NAME, owner.getName());
-        struct.set(OWNER_KEY_NAME, ownerStruct);
-        struct.set(ISSUE_TIMESTAMP_KEY_NAME, issueTimestamp);
-        struct.set(EXPIRY_TIMESTAMP_NAME, expiryTimestamp);
-        struct.set(MAX_TIMESTAMP_NAME, maxTimestamp);
-        struct.set(TOKEN_ID_KEY_NAME, tokenId);
-        struct.set(HMAC_KEY_NAME, hmac);
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-        return struct;
+        return new CreateDelegationTokenResponse(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version).read(buffer), version);
     }
 
-    public Errors error() {
-        return error;
-    }
-
-    public KafkaPrincipal owner() {
-        return owner;
+    public static CreateDelegationTokenResponse prepareResponse(int throttleTimeMs,
+            Errors error,
+            KafkaPrincipal owner,
+            long issueTimestamp,
+            long expiryTimestamp,
+            long maxTimestamp,
+            String tokenId,
+            ByteBuffer hmac) {
+        CreateDelegationTokenResponseData data = new CreateDelegationTokenResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(error.code())
+                .setPrincipalType(owner.getPrincipalType())
+                .setPrincipalName(owner.getName())
+                .setIssueTimestampMs(issueTimestamp)
+                .setExpiryTimestampMs(expiryTimestamp)
+                .setMaxTimestampMs(maxTimestamp)
+                .setTokenId(tokenId)
+                .setHmac(hmac.array());
+        return new CreateDelegationTokenResponse(data);
     }
 
-    public long issueTimestamp() {
-        return issueTimestamp;
+    public static CreateDelegationTokenResponse prepareResponse(int throttleTimeMs, Errors error, KafkaPrincipal owner) {
+        return prepareResponse(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
     }
 
-    public long expiryTimestamp() {
-        return expiryTimestamp;
+    public CreateDelegationTokenResponseData data() {
+        return data;
     }
 
-    public long maxTimestamp() {
-        return maxTimestamp;
-    }
-
-    public String tokenId() {
-        return tokenId;
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(error(), 1);
     }
 
-    public byte[] hmacBytes() {
-        byte[] byteArray = new byte[hmac.remaining()];
-        hmac.get(byteArray);
-        return byteArray;
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
+    }
+
+    public Errors error() {
+        return Errors.forCode(data.errorCode());
     }
 
     public boolean hasError() {
-        return this.error != Errors.NONE;
+        return error() != Errors.NONE;
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 11d85d6..1fb0880 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -37,6 +37,9 @@ import org.apache.kafka.common.message.ControlledShutdownRequestData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
@@ -1484,15 +1487,30 @@ public class RequestResponseTest {
     }
 
     private CreateDelegationTokenRequest createCreateTokenRequest() {
-        List<KafkaPrincipal> renewers = new ArrayList<>();
-        renewers.add(SecurityUtils.parseKafkaPrincipal("User:user1"));
-        renewers.add(SecurityUtils.parseKafkaPrincipal("User:user2"));
-        return new CreateDelegationTokenRequest.Builder(renewers, System.currentTimeMillis()).build();
+        List<CreatableRenewers> renewers = new ArrayList<>();
+        renewers.add(new CreatableRenewers()
+                .setPrincipalType("User")
+                .setPrincipalName("user1"));
+        renewers.add(new CreatableRenewers()
+                .setPrincipalType("User")
+                .setPrincipalName("user2"));
+        return new CreateDelegationTokenRequest.Builder(new CreateDelegationTokenRequestData()
+                .setRenewers(renewers)
+                .setMaxLifetimeMs(System.currentTimeMillis())).build();
     }
 
     private CreateDelegationTokenResponse createCreateTokenResponse() {
-        return new CreateDelegationTokenResponse(20, Errors.NONE, SecurityUtils.parseKafkaPrincipal("User:user1"), System.currentTimeMillis(),
-            System.currentTimeMillis(), System.currentTimeMillis(), "token1", ByteBuffer.wrap("test".getBytes()));
+        CreateDelegationTokenResponseData data = new CreateDelegationTokenResponseData()
+                .setThrottleTimeMs(20)
+                .setErrorCode(Errors.NONE.code())
+                .setPrincipalType("User")
+                .setPrincipalName("user1")
+                .setIssueTimestampMs(System.currentTimeMillis())
+                .setExpiryTimestampMs(System.currentTimeMillis())
+                .setMaxTimestampMs(System.currentTimeMillis())
+                .setTokenId("token1")
+                .setHmac("test".getBytes());
+        return new CreateDelegationTokenResponse(data);
     }
 
     private RenewDelegationTokenRequest createRenewTokenRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fa45cf5..e1000d0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2337,25 +2337,26 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace("Sending create token response for correlation id %d to client %s."
         .format(request.header.correlationId, request.header.clientId))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new CreateDelegationTokenResponse(requestThrottleMs, createResult.error, request.session.principal, createResult.issueTimestamp,
+        CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, createResult.error, request.session.principal, createResult.issueTimestamp,
           createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId, ByteBuffer.wrap(createResult.hmac)))
     }
 
     if (!allowTokenRequests(request))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new CreateDelegationTokenResponse(requestThrottleMs, Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, request.session.principal))
+        CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, request.session.principal))
     else {
-      val renewerList = createTokenRequest.renewers().asScala.toList
+      val renewerList = createTokenRequest.data.renewers.asScala.toList.map(entry =>
+        new KafkaPrincipal(entry.principalType, entry.principalName))
 
-      if (renewerList.exists(principal =>  principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) {
+      if (renewerList.exists(principal => principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) {
         sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new CreateDelegationTokenResponse(requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, request.session.principal))
+          CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, request.session.principal))
       }
       else {
         tokenManager.createToken(
           request.session.principal,
-          createTokenRequest.renewers().asScala.toList,
-          createTokenRequest.maxLifeTime(),
+          renewerList,
+          createTokenRequest.data.maxLifetimeMs,
           sendResponseCallback
         )
       }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index c0d9b44..493b3e3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.ControlledShutdownRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData
 import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.message.DescribeGroupsRequestData
 import org.apache.kafka.common.message.FindCoordinatorRequestData
@@ -429,7 +430,13 @@ class RequestQuotaTest extends BaseRequestTest {
           )
 
         case ApiKeys.CREATE_DELEGATION_TOKEN =>
-          new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000)
+          new CreateDelegationTokenRequest.Builder(
+              new CreateDelegationTokenRequestData()
+                .setRenewers(Collections.singletonList(new CreateDelegationTokenRequestData.CreatableRenewers()
+                .setPrincipalType("User")
+                .setPrincipalName("test")))
+                .setMaxLifetimeMs(1000)
+          )
 
         case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
           new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
@@ -548,7 +555,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.ALTER_REPLICA_LOG_DIRS => new AlterReplicaLogDirsResponse(response).throttleTimeMs
       case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs
       case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
-      case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response).throttleTimeMs
+      case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response, ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
       case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs