You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/04/11 17:48:09 UTC

[kafka] branch trunk updated: KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47918f2  KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427)
47918f2 is described below

commit 47918f2d79e907f6a6da599ab82a97c169722229
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Wed Apr 11 23:18:04 2018 +0530

    KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427)
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 build.gradle                                       |   1 +
 checkstyle/suppressions.xml                        |   2 +-
 .../apache/kafka/clients/admin/AdminClient.java    | 154 +++++++++++++++++++++
 .../admin/CreateDelegationTokenOptions.java        |  53 +++++++
 .../clients/admin/CreateDelegationTokenResult.java |  43 ++++++
 .../admin/DescribeDelegationTokenOptions.java      |  48 +++++++
 .../admin/DescribeDelegationTokenResult.java       |  45 ++++++
 .../admin/ExpireDelegationTokenOptions.java}       |  26 ++--
 .../admin/ExpireDelegationTokenResult.java}        |  29 ++--
 .../kafka/clients/admin/KafkaAdminClient.java      | 137 ++++++++++++++++++
 .../admin/RenewDelegationTokenOptions.java}        |  26 ++--
 .../admin/RenewDelegationTokenResult.java}         |  29 ++--
 .../kafka/common/network/ChannelBuilders.java      |   2 +-
 .../kafka/common/network/SaslChannelBuilder.java   |   2 +-
 .../requests/DescribeDelegationTokenResponse.java  |   4 +
 .../requests/ExpireDelegationTokenRequest.java     |   4 +-
 .../requests/ExpireDelegationTokenResponse.java    |   4 +
 .../requests/RenewDelegationTokenRequest.java      |   4 +-
 .../requests/RenewDelegationTokenResponse.java     |   4 +
 .../security/scram/internal/ScramSaslServer.java   |   2 +-
 .../scram/internal/ScramServerCallbackHandler.java |   4 +-
 .../security/token/delegation/DelegationToken.java |  11 +-
 .../token/delegation/TokenInformation.java         |   6 +
 .../{ => internal}/DelegationTokenCache.java       |   4 +-
 .../DelegationTokenCredentialCallback.java         |   2 +-
 .../kafka/clients/admin/MockAdminClient.java       |  20 +++
 .../apache/kafka/common/network/NioEchoServer.java |   2 +-
 .../kafka/common/requests/RequestResponseTest.java |   4 +-
 .../scram/internal/ScramSaslServerTest.java        |   2 +-
 core/src/main/scala/kafka/admin/AdminClient.scala  |  29 ----
 .../scala/kafka/admin/DelegationTokenCommand.scala |  88 ++++++------
 .../scala/kafka/security/CredentialProvider.scala  |   2 +-
 .../kafka/server/DelegationTokenManager.scala      |   3 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   2 +-
 .../DelegationTokenEndToEndAuthorizationTest.scala |   8 +-
 .../kafka/admin/DelegationTokenCommandTest.scala   | 147 ++++++++++++++++++++
 .../delegation/DelegationTokenManagerTest.scala    |   3 +-
 .../DelegationTokenRequestsOnPlainTextTest.scala   |  27 ++--
 .../kafka/server/DelegationTokenRequestsTest.scala | 102 ++++++++------
 ...nTokenRequestsWithDisableTokenFeatureTest.scala |  32 ++---
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   4 +-
 41 files changed, 907 insertions(+), 214 deletions(-)

diff --git a/build.gradle b/build.gradle
index f836980..69f560e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -858,6 +858,7 @@ project(':clients') {
     include "**/org/apache/kafka/common/config/*"
     include "**/org/apache/kafka/common/security/auth/*"
     include "**/org/apache/kafka/server/policy/*"
+    include "**/org/apache/kafka/common/security/token/delegation/*"
   }
 }
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0fec810..2767132 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -10,7 +10,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
+              files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
     <suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 897e127..53b77ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -535,4 +535,158 @@ public abstract class AdminClient implements AutoCloseable {
      */
     public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
                                                       DeleteRecordsOptions options);
+
+    /**
+     * <p>Create a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
+     * See the overload for more details.</p>
+     *
+     * @return                      The CreateDelegationTokenResult.
+     */
+    public CreateDelegationTokenResult createDelegationToken() {
+        return createDelegationToken(new CreateDelegationTokenOptions());
+    }
+
+
+    /**
+     * <p>Create a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     *     <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
+     *     if the renewers principal type is not supported.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param options               The options to use when creating delegation token.
+     * @return                      The DeleteRecordsResult.
+     */
+    public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
+
+
+    /**
+     * <p>Renew a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
+     * See the overload for more details.</p>
+     *
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @return                      The RenewDelegationTokenResult.
+     */
+    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
+        return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
+    }
+
+    /**
+     * <p> Renew a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     *     if the delegation token is not found on server.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     *     if the authenticated user is not owner/renewer of the token.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     *     if the delegation token is expired.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @param options               The options to use when renewing delegation token.
+     * @return                      The RenewDelegationTokenResult.
+     */
+    public abstract RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
+
+    /**
+     * <p>Expire a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
+     * This will expire the token immediately. See the overload for more details.</p>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @return                      The ExpireDelegationTokenResult.
+     */
+    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
+        return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
+    }
+
+    /**
+     * <p>Expire a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code ExpireDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     *     if the delegation token is not found on server.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     *     if the authenticated user is not owner/renewer of the requested token.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     *     if the delegation token is expired.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @param options               The options to use when expiring delegation token.
+     * @return                      The ExpireDelegationTokenResult.
+     */
+    public abstract ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
+
+    /**
+     *<p>Describe the Delegation Tokens.</p>
+     *
+     * <p>This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
+     * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.</p>
+     *
+     * @return                      The DescribeDelegationTokenResult.
+     */
+    public DescribeDelegationTokenResult describeDelegationToken() {
+        return describeDelegationToken(new DescribeDelegationTokenOptions());
+    }
+
+    /**
+     * <p>Describe the Delegation Tokens.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@code DescribeDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param options               The options to use when describing delegation tokens.
+     * @return                      The DescribeDelegationTokenResult.
+     */
+    public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
new file mode 100644
index 0000000..1b77b94
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+/**
+ * Options for {@link AdminClient#createDelegationToken(CreateDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
+    private long maxLifeTimeMs = -1;
+    private List<KafkaPrincipal> renewers =  new LinkedList<>();
+
+    public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> renewers) {
+        this.renewers = renewers;
+        return this;
+    }
+
+    public List<KafkaPrincipal> renewers() {
+        return renewers;
+    }
+
+    public CreateDelegationTokenOptions maxlifeTimeMs(long maxLifeTimeMs) {
+        this.maxLifeTimeMs = maxLifeTimeMs;
+        return this;
+    }
+
+    public long maxlifeTimeMs() {
+        return maxLifeTimeMs;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
new file mode 100644
index 0000000..043cbe8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+
+/**
+ * The result of the {@link KafkaAdminClient#createDelegationToken(CreateDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class CreateDelegationTokenResult {
+    private final KafkaFuture<DelegationToken> delegationToken;
+
+    CreateDelegationTokenResult(KafkaFuture<DelegationToken> delegationToken) {
+        this.delegationToken = delegationToken;
+    }
+
+    /**
+     * Returns a future which yields a delegation token
+     */
+    public KafkaFuture<DelegationToken> delegationToken() {
+        return delegationToken;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
new file mode 100644
index 0000000..60b9935
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+/**
+ * Options for {@link AdminClient#describeDelegationToken(DescribeDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDelegationTokenOptions> {
+    private List<KafkaPrincipal> owners;
+
+    /**
+     * if owners is null, all the user owned tokens and tokens where user have Describe permission
+     * will be returned.
+     * @param owners
+     * @return this instance
+     */
+    public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners) {
+        this.owners = owners;
+        return this;
+    }
+
+    public List<KafkaPrincipal> owners() {
+        return owners;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
new file mode 100644
index 0000000..7a9d4b9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+
+/**
+ * The result of the {@link KafkaAdminClient#describeDelegationToken(DescribeDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeDelegationTokenResult {
+    private final KafkaFuture<List<DelegationToken>> delegationTokens;
+
+    DescribeDelegationTokenResult(KafkaFuture<List<DelegationToken>> delegationTokens) {
+        this.delegationTokens = delegationTokens;
+    }
+
+    /**
+     * Returns a future which yields list of delegation tokens
+     */
+    public KafkaFuture<List<DelegationToken>> delegationTokens() {
+        return delegationTokens;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
similarity index 54%
copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
index 7490a3e..138cd4e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
@@ -14,18 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.token.delegation;
 
-import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+package org.apache.kafka.clients.admin;
 
-public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
-    private String tokenOwner;
+import org.apache.kafka.common.annotation.InterfaceStability;
 
-    public void tokenOwner(String tokenOwner) {
-        this.tokenOwner = tokenOwner;
+/**
+ * Options for {@link AdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ExpireDelegationTokenOptions extends AbstractOptions<ExpireDelegationTokenOptions> {
+    private long expiryTimePeriodMs = -1L;
+
+    public ExpireDelegationTokenOptions expiryTimePeriodMs(long expiryTimePeriodMs) {
+        this.expiryTimePeriodMs = expiryTimePeriodMs;
+        return this;
     }
 
-    public String tokenOwner() {
-        return tokenOwner;
+    public long expiryTimePeriodMs() {
+        return expiryTimePeriodMs;
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
similarity index 52%
copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
index 7490a3e..41782bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
@@ -14,18 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.token.delegation;
 
-import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+package org.apache.kafka.clients.admin;
 
-public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
-    private String tokenOwner;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
 
-    public void tokenOwner(String tokenOwner) {
-        this.tokenOwner = tokenOwner;
+/**
+ * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ExpireDelegationTokenResult {
+    private final KafkaFuture<Long> expiryTimestamp;
+
+    ExpireDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
+        this.expiryTimestamp = expiryTimestamp;
     }
 
-    public String tokenOwner() {
-        return tokenOwner;
+    /**
+     * Returns a future which yields expiry timestamp
+     */
+    public KafkaFuture<Long> expiryTimestamp() {
+        return expiryTimestamp;
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5118953..3ac0e28 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -69,6 +69,8 @@ import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
@@ -85,12 +87,20 @@ import org.apache.kafka.common.requests.DescribeAclsRequest;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
 import org.apache.kafka.common.requests.Resource;
 import org.apache.kafka.common.requests.ResourceType;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
@@ -2072,4 +2082,131 @@ public class KafkaAdminClient extends AdminClient {
 
         return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures));
     }
+
+    @Override
+    public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) {
+        final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                CreateDelegationTokenResponse response = (CreateDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    delegationTokenFuture.completeExceptionally(response.error().exception());
+                } else {
+                    TokenInformation tokenInfo =  new TokenInformation(response.tokenId(), response.owner(),
+                        options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp());
+                    DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes());
+                    delegationTokenFuture.complete(token);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                delegationTokenFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new CreateDelegationTokenResult(delegationTokenFuture);
+    }
+
+    @Override
+    public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) {
+        final KafkaFutureImpl<Long>  expiryTimeFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new RenewDelegationTokenRequest.Builder(hmac, options.renewTimePeriodMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                RenewDelegationTokenResponse response = (RenewDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    expiryTimeFuture.completeExceptionally(response.error().exception());
+                } else {
+                    expiryTimeFuture.complete(response.expiryTimestamp());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                expiryTimeFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new RenewDelegationTokenResult(expiryTimeFuture);
+    }
+
+    @Override
+    public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) {
+        final KafkaFutureImpl<Long>  expiryTimeFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                ExpireDelegationTokenResponse response = (ExpireDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    expiryTimeFuture.completeExceptionally(response.error().exception());
+                } else {
+                    expiryTimeFuture.complete(response.expiryTimestamp());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                expiryTimeFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new ExpireDelegationTokenResult(expiryTimeFuture);
+    }
+
+    @Override
+    public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) {
+        final KafkaFutureImpl<List<DelegationToken>>  tokensFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DescribeDelegationTokenRequest.Builder(options.owners());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeDelegationTokenResponse response = (DescribeDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    tokensFuture.completeExceptionally(response.error().exception());
+                } else {
+                    tokensFuture.complete(response.tokens());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                tokensFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new DescribeDelegationTokenResult(tokensFuture);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
similarity index 54%
copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
index 7490a3e..238dc4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
@@ -14,18 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.token.delegation;
 
-import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+package org.apache.kafka.clients.admin;
 
-public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
-    private String tokenOwner;
+import org.apache.kafka.common.annotation.InterfaceStability;
 
-    public void tokenOwner(String tokenOwner) {
-        this.tokenOwner = tokenOwner;
+/**
+ * Options for {@link AdminClient#renewDelegationToken(byte[], RenewDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class RenewDelegationTokenOptions extends AbstractOptions<RenewDelegationTokenOptions> {
+    private long renewTimePeriodMs = -1;
+
+    public RenewDelegationTokenOptions renewTimePeriodMs(long renewTimePeriodMs) {
+        this.renewTimePeriodMs = renewTimePeriodMs;
+        return this;
     }
 
-    public String tokenOwner() {
-        return tokenOwner;
+    public long renewTimePeriodMs() {
+        return renewTimePeriodMs;
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
similarity index 52%
copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
index 7490a3e..38cdf1a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
@@ -14,18 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.token.delegation;
 
-import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+package org.apache.kafka.clients.admin;
 
-public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
-    private String tokenOwner;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
 
-    public void tokenOwner(String tokenOwner) {
-        this.tokenOwner = tokenOwner;
+/**
+ * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class RenewDelegationTokenResult {
+    private final KafkaFuture<Long> expiryTimestamp;
+
+    RenewDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
+        this.expiryTimestamp = expiryTimestamp;
     }
 
-    public String tokenOwner() {
-        return tokenOwner;
+    /**
+     * Returns a future which yields expiry timestamp
+     */
+    public KafkaFuture<Long> expiryTimestamp() {
+        return expiryTimestamp;
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 80ccb7e..078d844 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.Collections;
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 5502164..3985c7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -41,7 +41,7 @@ import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.internal.ScramMechanism;
 import org.apache.kafka.common.security.scram.internal.ScramServerCallbackHandler;
 import org.apache.kafka.common.security.ssl.SslFactory;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 import org.apache.kafka.common.utils.Java;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index dba29ea..7ba270a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -184,4 +184,8 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
     public List<DelegationToken> tokens() {
         return tokens;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 0d43440..40f0aad 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -88,9 +88,9 @@ public class ExpireDelegationTokenRequest extends AbstractRequest {
         private final ByteBuffer hmac;
         private final long expiryTimePeriod;
 
-        public Builder(ByteBuffer hmac, long expiryTimePeriod) {
+        public Builder(byte[] hmac, long expiryTimePeriod) {
             super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
-            this.hmac = hmac;
+            this.hmac = ByteBuffer.wrap(hmac);
             this.expiryTimePeriod = expiryTimePeriod;
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
index f7e0ec4..1a673bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -93,4 +93,8 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index 4a4b762..a65c705 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -88,9 +88,9 @@ public class RenewDelegationTokenRequest extends AbstractRequest {
         private final ByteBuffer hmac;
         private final long renewTimePeriod;
 
-        public Builder(ByteBuffer hmac, long renewTimePeriod) {
+        public Builder(byte[] hmac, long renewTimePeriod) {
             super(ApiKeys.RENEW_DELEGATION_TOKEN);
-            this.hmac = hmac;
+            this.hmac = ByteBuffer.wrap(hmac);
             this.renewTimePeriod = renewTimePeriod;
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
index 1885b48..3233f5c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -93,4 +93,8 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
     public long expiryTimestamp() {
         return expiryTimestamp;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
index deee0b8..6085727 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFinal
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFirstMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFinalMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFirstMessage;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
index 377aa3d..9a3f0dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
@@ -29,8 +29,8 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.ScramCredentialCallback;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
 
 public class ScramServerCallbackHandler implements AuthenticateCallbackHandler {
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
index 05ccbda..e1f97c1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
@@ -16,11 +16,16 @@
  */
 package org.apache.kafka.common.security.token.delegation;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.utils.Base64;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+/**
+ * A class representing a delegation token.
+ *
+ */
+@InterfaceStability.Evolving
 public class DelegationToken {
     private TokenInformation tokenInformation;
     private byte[] hmac;
@@ -42,10 +47,6 @@ public class DelegationToken {
         return Base64.encoder().encodeToString(hmac);
     }
 
-    public ByteBuffer hmacBuffer() {
-        return ByteBuffer.wrap(hmac);
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
index 1d500d2..ffd2af3 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
@@ -16,11 +16,17 @@
  */
 package org.apache.kafka.common.security.token.delegation;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 import java.util.ArrayList;
 import java.util.Collection;
 
+/**
+ * A class representing a delegation token details.
+ *
+ */
+@InterfaceStability.Evolving
 public class TokenInformation {
 
     private KafkaPrincipal owner;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
similarity index 95%
rename from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
index adea210..c05b735 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.common.security.token.delegation;
+package org.apache.kafka.common.security.token.delegation.internal;
 
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.internal.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.internal.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
 
 import java.util.Collection;
 import java.util.HashMap;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
similarity index 94%
rename from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
index 7490a3e..294d7b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.token.delegation;
+package org.apache.kafka.common.security.token.delegation.internal;
 
 import org.apache.kafka.common.security.scram.ScramCredentialCallback;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c141a8a..0f5df38 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -277,6 +277,26 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
+    public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
     public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index fab8e93..68979a1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -44,7 +44,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 
 /**
  * Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index bdbd106..c63cecd 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1223,7 +1223,7 @@ public class RequestResponseTest {
     }
 
     private RenewDelegationTokenRequest createRenewTokenRequest() {
-        return new RenewDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+        return new RenewDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
     }
 
     private RenewDelegationTokenResponse createRenewTokenResponse() {
@@ -1231,7 +1231,7 @@ public class RequestResponseTest {
     }
 
     private ExpireDelegationTokenRequest createExpireTokenRequest() {
-        return new ExpireDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+        return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
     }
 
     private ExpireDelegationTokenResponse createExpireTokenResponse() {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
index 3c4b82d..f6e43f9 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 
 import org.junit.Before;
 import org.junit.Test;
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index c010ba0..bcc11fd 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -35,8 +35,6 @@ import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
@@ -342,33 +340,6 @@ class AdminClient(val time: Time,
     ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
   }
 
-  def createToken(renewers: List[KafkaPrincipal], maxTimePeriodMs: Long = -1): (Errors, DelegationToken) = {
-    val responseBody = sendAnyNode(ApiKeys.CREATE_DELEGATION_TOKEN, new CreateDelegationTokenRequest.Builder(renewers.asJava, maxTimePeriodMs))
-    val response = responseBody.asInstanceOf[CreateDelegationTokenResponse]
-    val tokenInfo = new TokenInformation(response.tokenId, response.owner, renewers.asJava,
-      response.issueTimestamp, response.maxTimestamp, response.expiryTimestamp)
-    (response.error, new DelegationToken(tokenInfo, response.hmacBytes))
-  }
-
-  def renewToken(hmac: ByteBuffer, renewTimePeriod: Long = -1): (Errors, Long) = {
-    val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new RenewDelegationTokenRequest.Builder(hmac, renewTimePeriod))
-    val response = responseBody.asInstanceOf[RenewDelegationTokenResponse]
-    (response.error, response.expiryTimestamp)
-  }
-
-  def expireToken(hmac: ByteBuffer, expiryTimeStamp: Long = -1): (Errors, Long) = {
-    val responseBody = sendAnyNode(ApiKeys.EXPIRE_DELEGATION_TOKEN, new ExpireDelegationTokenRequest.Builder(hmac, expiryTimeStamp))
-    val response = responseBody.asInstanceOf[ExpireDelegationTokenResponse]
-    (response.error, response.expiryTimestamp)
-  }
-
-  def describeToken(owners: List[KafkaPrincipal]): (Errors, List[DelegationToken]) = {
-    val ownersList = if (owners == null) null else owners.asJava
-    val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new DescribeDelegationTokenRequest.Builder(ownersList))
-    val response = responseBody.asInstanceOf[DescribeDelegationTokenResponse]
-    (response.error, response.tokens().asScala.toList)
-  }
-
   def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
 
     def coordinatorLookup(group: String): Either[Node, Errors] = {
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 6c5d1ce..0e6ea86 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -17,12 +17,13 @@
 
 package kafka.admin
 
-import java.nio.ByteBuffer
+import java.text.SimpleDateFormat
+import java.util
 
-import joptsimple._
+import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser}
 import kafka.utils.{CommandLineUtils, Exit, Logging}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils}
@@ -71,19 +72,20 @@ object DelegationTokenCommand extends Logging {
     }
   }
 
-  def createToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
-    val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt)
+  def createToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): DelegationToken = {
+    val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]())
     val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
 
     println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs)
-    val response = adminClient.createToken(renewerPrincipals, maxLifeTimeMs)
-    response  match {
-        case (Errors.NONE, token) => println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
-        case (e, _) =>  throw new AdminOperationException(e.message)
-    }
+    val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals)
+    val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
+    val token = createResult.delegationToken().get()
+    println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
+    token
   }
 
   def printToken(tokens: List[DelegationToken]): Unit = {
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
     print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
     for (token <- tokens) {
       val tokenInfo = token.tokenInfo
@@ -92,56 +94,59 @@ object DelegationTokenCommand extends Logging {
         token.hmacAsBase64String,
         tokenInfo.owner,
         tokenInfo.renewersAsString,
-        tokenInfo.issueTimestamp,
-        tokenInfo.expiryTimestamp,
-        tokenInfo.maxTimestamp))
+        dateFormat.format(tokenInfo.issueTimestamp),
+        dateFormat.format(tokenInfo.expiryTimestamp),
+        dateFormat.format(tokenInfo.maxTimestamp)))
       println()
     }
   }
 
-  private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): List[KafkaPrincipal] = {
+  private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Option[util.List[KafkaPrincipal]] = {
     if (opts.options.has(principalOptionSpec))
-      opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList
+      Some(opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList.asJava)
     else
-      List.empty[KafkaPrincipal]
+      None
   }
 
-  def renewToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+  def renewToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
     println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
-    val response = adminClient.renewToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), renewTimePeriodMs)
-    response match {
-      case (Errors.NONE, expiryTimeStamp) => println("Completed renew operation. New expiry timestamp : %s".format(expiryTimeStamp))
-      case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
-    }
+    val renewResult = adminClient.renewDelegationToken(Base64.decoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs))
+    val expiryTimeStamp = renewResult.expiryTimestamp().get()
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+    println("Completed renew operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
+    expiryTimeStamp
   }
 
-  def expireToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+  def expireToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
     println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
-    val response = adminClient.expireToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), expiryTimePeriodMs)
-    response match {
-      case (Errors.NONE, expiryTimeStamp) => println("Completed expire operation. New expiry timestamp : %s".format(expiryTimeStamp))
-      case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
-    }
+    val expireResult = adminClient.expireDelegationToken(Base64.decoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
+    val expiryTimeStamp = expireResult.expiryTimestamp().get()
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+    println("Completed expire operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
+    expiryTimeStamp
   }
 
-  def describeToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+  def describeToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): List[DelegationToken] = {
     val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
-    println("Calling describe token operation for owners :" + ownerPrincipals)
-    val response = adminClient.describeToken(ownerPrincipals)
-    response  match {
-      case (Errors.NONE, tokens) => println("Total Number of tokens : %s".format(tokens.size)); printToken(tokens)
-      case (e, tokens) => throw new AdminOperationException(e.message)
-    }
+    if (ownerPrincipals.isEmpty)
+      println("Calling describe token operation for current user.")
+    else
+      println("Calling describe token operation for owners :" + ownerPrincipals.get)
+
+    val describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals.orNull))
+    val tokens = describeResult.delegationTokens().get().asScala.toList
+    println("Total number of tokens : %s".format(tokens.size)); printToken(tokens)
+    tokens
   }
 
-  private def createAdminClient(opts: DelegationTokenCommandOptions): AdminClient = {
+  private def createAdminClient(opts: DelegationTokenCommandOptions): JAdminClient = {
     val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
-    AdminClient.create(props)
+    JAdminClient.create(props)
   }
 
   class DelegationTokenCommandOptions(args: Array[String]) {
@@ -157,10 +162,11 @@ object DelegationTokenCommand extends Logging {
       .withRequiredArg
       .ofType(classOf[String])
 
-    val createOpt = parser.accepts("create", "Create a new delegation token.")
-    val renewOpt = parser.accepts("renew",  "Renew delegation token.")
-    val expiryOpt = parser.accepts("expire", "Expire delegation token.")
-    val describeOpt = parser.accepts("describe", "describe delegation tokens.")
+    val createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.")
+    val renewOpt = parser.accepts("renew",  "Renew delegation token. Use --renew-time-period option to set renew time period.")
+    val expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.")
+    val describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
+      " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.")
 
     val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
       .withOptionalArg()
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 6f9c252..f208087 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.config.ConfigDef._
 import org.apache.kafka.common.security.scram.internal.{ScramCredentialUtils, ScramMechanism}
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 
 class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) {
 
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 4a947a1..62a5e20 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -31,7 +31,8 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internal.{ScramFormatter, ScramMechanism}
 import org.apache.kafka.common.security.scram.ScramCredential
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time}
 
 import scala.collection.JavaConverters._
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7105688..a0d2c79 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index 27a6d11..56a3b8a 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -18,10 +18,9 @@ package kafka.api
 
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
@@ -83,9 +82,8 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
     val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
     config.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
 
-    val adminClient = AdminClient.create(config.asScala.toMap)
-    val (error, token)  = adminClient.createToken(List())
-
+    val adminClient = AdminClient.create(config)
+    val token = adminClient.createDelegationToken().delegationToken().get()
     //wait for token to reach all the brokers
     TestUtils.waitUntilTrue(() => servers.forall(server => !server.tokenCache.tokens().isEmpty),
       "Timed out waiting for token to propagate to all servers")
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
new file mode 100644
index 0000000..6ae8f5e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -0,0 +1,147 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.admin
+
+import java.util
+
+import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.ExecutionException
+
+class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List("PLAIN")
+  protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+  var adminClient: org.apache.kafka.clients.admin.AdminClient = null
+
+  override def numBrokers = 1
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  override def generateConfigs = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+      enableControlledShutdown = false, enableDeleteTopic = true,
+      interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
+    props.foreach(propertyOverrides)
+    props.map(KafkaConfig.fromProps)
+  }
+
+  private def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  @Test
+  def testDelegationTokenRequests(): Unit = {
+    adminClient = org.apache.kafka.clients.admin.AdminClient.create(createAdminConfig)
+    val renewer1 = "User:renewer1"
+    val renewer2 = "User:renewer2"
+
+    // create token1 with renewer1
+    val tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer1)))
+
+    var tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+    assertTrue(tokens.size == 1)
+    val token1 = tokens.head
+    assertEquals(token1, tokenCreated)
+
+    // create token2 with renewer2
+    val token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer2)))
+
+    tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+    assertTrue(tokens.size == 2)
+    assertEquals(Set(token1, token2), tokens.toSet)
+
+    //get tokens for renewer2
+    tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer2)))
+    assertTrue(tokens.size == 1)
+    assertEquals(Set(token2), tokens.toSet)
+
+    //test renewing tokens
+    val expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String()))
+    val renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer1))).head
+    assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp())
+
+    //test expire tokens
+    DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String()))
+    DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String()))
+
+    tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+    assertTrue(tokens.size == 0)
+
+    //create token with invalid renewer principal type
+    intercept[ExecutionException](DelegationTokenCommand.createToken(adminClient, getCreateOpts(List("Group:Renewer3"))))
+
+    // try describing tokens for unknown owner
+    assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List("User:Unknown"))).isEmpty)
+  }
+
+  private def getCreateOpts(renewers: List[String]): DelegationTokenCommandOptions = {
+    val opts = ListBuffer("--bootstrap-server", brokerList, "--max-life-time-period", "-1",
+      "--command-config", "testfile", "--create")
+    renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", renewer))
+    new DelegationTokenCommandOptions(opts.toArray)
+  }
+
+  private def getDescribeOpts(owners: List[String]): DelegationTokenCommandOptions = {
+    val opts = ListBuffer("--bootstrap-server", brokerList, "--command-config", "testfile", "--describe")
+    owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner))
+    new DelegationTokenCommandOptions(opts.toArray)
+  }
+
+  private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = {
+    val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--renew",
+      "--renew-time-period", "-1",
+      "--hmac", hmac)
+    new DelegationTokenCommandOptions(opts)
+  }
+
+  private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = {
+    val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--expire",
+      "--expiry-time-period", "-1",
+      "--hmac", hmac)
+    new DelegationTokenCommandOptions(opts)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (adminClient != null)
+      adminClient.close()
+    super.tearDown()
+    closeSasl()
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index b8388b4..6093622 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 4c42dd2..3d4be53 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -19,14 +19,13 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
-import org.junit.Assert._
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
   var adminClient: AdminClient = null
@@ -49,21 +48,19 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
 
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+    adminClient = AdminClient.create(createAdminConfig)
 
-    val createResponse = adminClient.createToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, createResponse._1)
+    val createResult = adminClient.createDelegationToken()
+    intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    val describeResponse = adminClient.describeToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, describeResponse._1)
+    val describeResult = adminClient.describeDelegationToken()
+    intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    //test renewing tokens
-    val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, renewResponse._1)
+    val renewResult = adminClient.renewDelegationToken("".getBytes())
+    intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    //test expire tokens tokens
-    val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, expireResponse._1)
+    val expireResult = adminClient.expireDelegationToken("".getBytes())
+    intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
   }
 
 
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index 55bf5fd..a002750 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -18,17 +18,17 @@ package kafka.server
 
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
+import org.apache.kafka.common.errors.InvalidPrincipalTypeException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.SecurityUtils
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -46,15 +46,6 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
     super.setUp()
   }
 
-  def createAdminConfig():util.Map[String, Object] = {
-    val config = new util.HashMap[String, Object]
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
-    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
-    config
-  }
-
   override def generateConfigs = {
     val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
       enableControlledShutdown = false, enableDeleteTopic = true,
@@ -64,46 +55,73 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
     props.map(KafkaConfig.fromProps)
   }
 
+  private def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
-
-    // test creating token
-    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
-    val tokenResult1 = adminClient.createToken(renewer1)
-    assertEquals(Errors.NONE, tokenResult1._1)
-    var token1 = adminClient.describeToken(null)._2.head
-    assertEquals(token1, tokenResult1._2)
+    adminClient = AdminClient.create(createAdminConfig)
+
+    // create token1 with renewer1
+    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:renewer1")).asJava
+    val createResult1 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer1))
+    val tokenCreated = createResult1.delegationToken().get()
+
+    //test describe token
+    var tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size() == 1)
+    var token1 = tokens.get(0)
+    assertEquals(token1, tokenCreated)
+
+    // create token2 with renewer2
+    val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:renewer2")).asJava
+    val createResult2 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer2))
+    val token2 = createResult2.delegationToken().get()
+
+    //get all tokens
+    tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size() == 2)
+    assertEquals(Set(token1, token2), tokens.asScala.toSet)
+
+    //get tokens for renewer2
+    tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(renewer2)).delegationTokens().get()
+    assertTrue(tokens.size() == 1)
+    assertEquals(Set(token2), tokens.asScala.toSet)
 
     //test renewing tokens
-    val renewResponse = adminClient.renewToken(token1.hmacBuffer())
-    assertEquals(Errors.NONE, renewResponse._1)
-
-    token1 = adminClient.describeToken(null)._2.head
-    assertEquals(renewResponse._2, token1.tokenInfo().expiryTimestamp())
+    val renewResult = adminClient.renewDelegationToken(token1.hmac())
+    var expiryTimestamp = renewResult.expiryTimestamp().get()
 
-    //test describe tokens
-    val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:Renewer1"))
-    val tokenResult2 = adminClient.createToken(renewer2)
-    assertEquals(Errors.NONE, tokenResult2._1)
-    val token2 = tokenResult2._2
+    val describeResult = adminClient.describeDelegationToken()
+    val tokenId = token1.tokenInfo().tokenId()
+    token1 = describeResult.delegationTokens().get().asScala.filter(dt => dt.tokenInfo().tokenId() == tokenId).head
+    assertEquals(expiryTimestamp, token1.tokenInfo().expiryTimestamp())
 
-    assertTrue(adminClient.describeToken(null)._2.size == 2)
+    //test expire tokens
+    val expireResult1 = adminClient.expireDelegationToken(token1.hmac())
+    expiryTimestamp = expireResult1.expiryTimestamp().get()
 
-    //test expire tokens tokens
-    val expireResponse1 = adminClient.expireToken(token1.hmacBuffer())
-    assertEquals(Errors.NONE, expireResponse1._1)
+    val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
+    expiryTimestamp = expireResult2.expiryTimestamp().get()
 
-    val expireResponse2 = adminClient.expireToken(token2.hmacBuffer())
-    assertEquals(Errors.NONE, expireResponse2._1)
-
-    assertTrue(adminClient.describeToken(null)._2.size == 0)
+    tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size == 0)
 
     //create token with invalid principal type
-    val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer1"))
-    val tokenResult3 = adminClient.createToken(renewer3)
-    assertEquals(Errors.INVALID_PRINCIPAL_TYPE, tokenResult3._1)
-
+    val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
+    val createResult3 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer3))
+    intercept[ExecutionException](createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
+
+    // try describing tokens for unknown owner
+    val unknownOwner = List(SecurityUtils.parseKafkaPrincipal("User:Unknown")).asJava
+    tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(unknownOwner)).delegationTokens().get()
+    assertTrue(tokens.isEmpty)
   }
 
   @After
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index 0561cac..8f8842b 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -19,17 +19,15 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.SecurityUtils
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.errors.DelegationTokenDisabledException
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -58,23 +56,19 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
 
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
-
-    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
-    val createResponse = adminClient.createToken(renewer1)
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createResponse._1)
+    adminClient = AdminClient.create(createAdminConfig)
 
-    val describeResponse = adminClient.describeToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, describeResponse._1)
+    val createResult = adminClient.createDelegationToken()
+    intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
-    //test renewing tokens
-    val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, renewResponse._1)
+    val describeResult = adminClient.describeDelegationToken()
+    intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
-    //test expire tokens tokens
-    val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, expireResponse._1)
+    val renewResult = adminClient.renewDelegationToken("".getBytes())
+    intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
+    val expireResult = adminClient.expireDelegationToken("".getBytes())
+    intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
   }
 
   @After
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 2a7d6d4..ed85415 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -315,13 +315,13 @@ class RequestQuotaTest extends BaseRequestTest {
           new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000)
 
         case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
-          new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+          new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
 
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
           new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
 
         case ApiKeys.RENEW_DELEGATION_TOKEN =>
-          new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+          new RenewDelegationTokenRequest.Builder("".getBytes, 1000)
 
         case ApiKeys.DELETE_GROUPS =>
           new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.