You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/01/16 17:50:25 UTC

[kafka] branch trunk updated: KAFKA-4541; Support for delegation token mechanism

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27a8d0f  KAFKA-4541; Support for delegation token mechanism
27a8d0f is described below

commit 27a8d0f9e7f3b05f331d1449b17cf7e085a4b45c
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Tue Jan 16 09:50:31 2018 -0800

    KAFKA-4541; Support for delegation token mechanism
    
    - Add capability to create delegation token
    - Add authentication based on delegation token.
    - Add capability to renew/expire delegation tokens.
    - Add units tests and integration tests
    
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: Jun Rao <ju...@gmail.com>
    
    Closes #3616 from omkreddy/KAFKA-4541
---
 bin/kafka-delegation-tokens.sh                     |  17 +
 bin/windows/kafka-delegation-tokens.bat            |  17 +
 checkstyle/suppressions.xml                        |   5 +-
 .../DelegationTokenAuthorizationException.java}    |  18 +-
 .../DelegationTokenDisabledException.java}         |  18 +-
 .../DelegationTokenExpiredException.java}          |  18 +-
 .../DelegationTokenNotFoundException.java}         |  18 +-
 .../DelegationTokenOwnerMismatchException.java}    |  18 +-
 .../InvalidPrincipalTypeException.java}            |  18 +-
 .../UnsupportedByAuthenticationException.java}     |  22 +-
 .../kafka/common/network/ChannelBuilders.java      |  13 +-
 .../kafka/common/network/SaslChannelBuilder.java   |  10 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  14 +-
 .../apache/kafka/common/protocol/CommonFields.java |   4 +
 .../org/apache/kafka/common/protocol/Errors.java   |  49 ++
 .../kafka/common/requests/AbstractRequest.java     |   8 +
 .../kafka/common/requests/AbstractResponse.java    |   8 +
 .../requests/CreateDelegationTokenRequest.java     | 136 ++++++
 .../requests/CreateDelegationTokenResponse.java    | 168 +++++++
 .../requests/DescribeDelegationTokenRequest.java   | 132 ++++++
 .../requests/DescribeDelegationTokenResponse.java  | 187 ++++++++
 .../requests/ExpireDelegationTokenRequest.java     | 112 +++++
 .../requests/ExpireDelegationTokenResponse.java    |  96 ++++
 .../requests/RenewDelegationTokenRequest.java      | 112 +++++
 .../requests/RenewDelegationTokenResponse.java     |  96 ++++
 .../apache/kafka/common/resource/ResourceType.java |   7 +-
 .../kafka/common/security/auth/KafkaPrincipal.java |  15 +-
 .../DefaultKafkaPrincipalBuilder.java              |  10 +-
 .../authenticator/SaslClientCallbackHandler.java   |   8 +-
 .../authenticator/SaslServerAuthenticator.java     |   9 +-
 ... => DelegationTokenAuthenticationCallback.java} |  13 +-
 .../security/scram/ScramCredentialCallback.java    |  24 +
 .../common/security/scram/ScramLoginModule.java    |   4 +
 .../kafka/common/security/scram/ScramMessages.java |  34 +-
 .../common/security/scram/ScramSaslClient.java     |   8 +-
 .../common/security/scram/ScramSaslServer.java     |  19 +-
 .../security/scram/ScramServerCallbackHandler.java |  17 +-
 .../security/token/delegation/DelegationToken.java |  80 ++++
 .../token/delegation/DelegationTokenCache.java     | 111 +++++
 .../token/delegation/TokenInformation.java         | 136 ++++++
 .../apache/kafka/common/network/NioEchoServer.java |  10 +-
 .../common/network/SaslChannelBuilderTest.java     |   2 +-
 .../kafka/common/protocol/ProtoUtilsTest.java      |   3 +-
 .../kafka/common/requests/RequestResponseTest.java |  70 +++
 .../kafka/common/resource/ResourceTypeTest.java    |   3 +-
 .../auth/DefaultKafkaPrincipalBuilderTest.java     |   2 +
 .../authenticator/SaslAuthenticatorTest.java       |  53 ++-
 .../authenticator/SaslServerAuthenticatorTest.java |   4 +-
 .../common/security/scram/ScramMessagesTest.java   |   7 +-
 .../common/security/scram/ScramSaslServerTest.java |   3 +-
 core/src/main/scala/kafka/admin/AclCommand.scala   |  14 +-
 core/src/main/scala/kafka/admin/AdminClient.scala  |  33 +-
 .../scala/kafka/admin/DelegationTokenCommand.scala | 214 +++++++++
 .../scala/kafka/controller/KafkaController.scala   |  19 +-
 .../main/scala/kafka/network/SocketServer.scala    |   2 +-
 .../scala/kafka/security/CredentialProvider.scala  |   3 +-
 .../scala/kafka/security/auth/ResourceType.scala   |   8 +-
 .../kafka/server/DelegationTokenManager.scala      | 515 +++++++++++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 130 +++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  31 ++
 core/src/main/scala/kafka/server/KafkaServer.scala |  18 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   4 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  73 +++
 core/src/main/scala/kafka/zk/ZkData.scala          |  38 +-
 .../DelegationTokenEndToEndAuthorizationTest.scala |  96 ++++
 .../scala/unit/kafka/admin/AclCommandTest.scala    |   7 +-
 .../kafka/integration/KafkaServerTestHarness.scala |   9 +
 .../unit/kafka/network/SocketServerTest.scala      |   2 +-
 .../delegation/DelegationTokenManagerTest.scala    | 311 +++++++++++++
 .../DelegationTokenRequestsOnPlainTextTest.scala   |  76 +++
 .../kafka/server/DelegationTokenRequestsTest.scala | 116 +++++
 ...nTokenRequestsWithDisableTokenFeatureTest.scala |  87 ++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  14 +
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  17 +
 .../scala/unit/kafka/utils/JaasTestUtils.scala     |  15 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  11 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  40 ++
 78 files changed, 3732 insertions(+), 140 deletions(-)

diff --git a/bin/kafka-delegation-tokens.sh b/bin/kafka-delegation-tokens.sh
new file mode 100755
index 0000000..49cb276
--- /dev/null
+++ b/bin/kafka-delegation-tokens.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.DelegationTokenCommand "$@"
diff --git a/bin/windows/kafka-delegation-tokens.bat b/bin/windows/kafka-delegation-tokens.bat
new file mode 100644
index 0000000..996537f
--- /dev/null
+++ b/bin/windows/kafka-delegation-tokens.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+"%~dp0kafka-run-class.bat" kafka.admin.DelegationTokenCommand %*
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f610e88..05d4a52 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -51,14 +51,11 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer|SaslClientAuthenticator).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
 
     <suppress checks="JavaNCSS"
               files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
 
-    <suppress checks="JavaNCSS"
-              files="AbstractRequest.java"/>
-
     <suppress checks="NPathComplexity"
               files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent).java"/>
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenAuthorizationException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenAuthorizationException.java
index 931210a..ddc97c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenAuthorizationException.java
@@ -14,18 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
 
-import javax.security.auth.callback.Callback;
+public class DelegationTokenAuthorizationException extends AuthorizationException {
 
-public class ScramCredentialCallback implements Callback {
-    private ScramCredential scramCredential;
+    private static final long serialVersionUID = 1L;
 
-    public ScramCredential scramCredential() {
-        return scramCredential;
+    public DelegationTokenAuthorizationException(String message) {
+        super(message);
     }
 
-    public void scramCredential(ScramCredential scramCredential) {
-        this.scramCredential = scramCredential;
+    public DelegationTokenAuthorizationException(String message, Throwable cause) {
+        super(message, cause);
     }
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenDisabledException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenDisabledException.java
index 931210a..798611e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenDisabledException.java
@@ -14,18 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
 
-import javax.security.auth.callback.Callback;
+public class DelegationTokenDisabledException extends ApiException {
 
-public class ScramCredentialCallback implements Callback {
-    private ScramCredential scramCredential;
+    private static final long serialVersionUID = 1L;
 
-    public ScramCredential scramCredential() {
-        return scramCredential;
+    public DelegationTokenDisabledException(String message) {
+        super(message);
     }
 
-    public void scramCredential(ScramCredential scramCredential) {
-        this.scramCredential = scramCredential;
+    public DelegationTokenDisabledException(String message, Throwable cause) {
+        super(message, cause);
     }
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenExpiredException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenExpiredException.java
index 931210a..4dae7f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenExpiredException.java
@@ -14,18 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
 
-import javax.security.auth.callback.Callback;
+public class DelegationTokenExpiredException extends ApiException {
 
-public class ScramCredentialCallback implements Callback {
-    private ScramCredential scramCredential;
+    private static final long serialVersionUID = 1L;
 
-    public ScramCredential scramCredential() {
-        return scramCredential;
+    public DelegationTokenExpiredException(String message) {
+        super(message);
     }
 
-    public void scramCredential(ScramCredential scramCredential) {
-        this.scramCredential = scramCredential;
+    public DelegationTokenExpiredException(String message, Throwable cause) {
+        super(message, cause);
     }
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenNotFoundException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenNotFoundException.java
index 931210a..5875edf 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenNotFoundException.java
@@ -14,18 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
 
-import javax.security.auth.callback.Callback;
+public class DelegationTokenNotFoundException extends ApiException {
 
-public class ScramCredentialCallback implements Callback {
-    private ScramCredential scramCredential;
+    private static final long serialVersionUID = 1L;
 
-    public ScramCredential scramCredential() {
-        return scramCredential;
+    public DelegationTokenNotFoundException(String message) {
+        super(message);
     }
 
-    public void scramCredential(ScramCredential scramCredential) {
-        this.scramCredential = scramCredential;
+    public DelegationTokenNotFoundException(String message, Throwable cause) {
+        super(message, cause);
     }
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenOwnerMismatchException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenOwnerMismatchException.java
index 931210a..5c8239e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenOwnerMismatchException.java
@@ -14,18 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
 
-import javax.security.auth.callback.Callback;
+public class DelegationTokenOwnerMismatchException extends ApiException {
 
-public class ScramCredentialCallback implements Callback {
-    private ScramCredential scramCredential;
+    private static final long serialVersionUID = 1L;
 
-    public ScramCredential scramCredential() {
-        return scramCredential;
+    public DelegationTokenOwnerMismatchException(String message) {
+        super(message);
     }
 
-    public void scramCredential(ScramCredential scramCredential) {
-        this.scramCredential = scramCredential;
+    public DelegationTokenOwnerMismatchException(String message, Throwable cause) {
+        super(message, cause);
     }
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPrincipalTypeException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/InvalidPrincipalTypeException.java
index 931210a..a0736e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPrincipalTypeException.java
@@ -14,18 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
 
-import javax.security.auth.callback.Callback;
+public class InvalidPrincipalTypeException extends ApiException {
 
-public class ScramCredentialCallback implements Callback {
-    private ScramCredential scramCredential;
+    private static final long serialVersionUID = 1L;
 
-    public ScramCredential scramCredential() {
-        return scramCredential;
+    public InvalidPrincipalTypeException(String message) {
+        super(message);
     }
 
-    public void scramCredential(ScramCredential scramCredential) {
-        this.scramCredential = scramCredential;
+    public InvalidPrincipalTypeException(String message, Throwable cause) {
+        super(message, cause);
     }
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedByAuthenticationException.java
similarity index 64%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/UnsupportedByAuthenticationException.java
index 931210a..40f357c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedByAuthenticationException.java
@@ -14,18 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
 
-import javax.security.auth.callback.Callback;
-
-public class ScramCredentialCallback implements Callback {
-    private ScramCredential scramCredential;
+/**
+ * Authentication mechanism does not support the requested function.
+ */
+public class UnsupportedByAuthenticationException extends ApiException {
+    private static final long serialVersionUID = 1L;
 
-    public ScramCredential scramCredential() {
-        return scramCredential;
+    public UnsupportedByAuthenticationException(String message) {
+        super(message);
     }
 
-    public void scramCredential(ScramCredential scramCredential) {
-        this.scramCredential = scramCredential;
+    public UnsupportedByAuthenticationException(String message, Throwable cause) {
+        super(message, cause);
     }
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 74bd0a0..8f301ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.Map;
@@ -59,7 +60,7 @@ public class ChannelBuilders {
                 throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
         }
         return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism,
-                saslHandshakeRequestEnable, null);
+                saslHandshakeRequestEnable, null, null);
     }
 
     /**
@@ -72,9 +73,10 @@ public class ChannelBuilders {
     public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
                                                       SecurityProtocol securityProtocol,
                                                       AbstractConfig config,
-                                                      CredentialCache credentialCache) {
+                                                      CredentialCache credentialCache,
+                                                      DelegationTokenCache tokenCache) {
         return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null,
-                true, credentialCache);
+                true, credentialCache, tokenCache);
     }
 
     private static ChannelBuilder create(SecurityProtocol securityProtocol,
@@ -84,7 +86,8 @@ public class ChannelBuilders {
                                          ListenerName listenerName,
                                          String clientSaslMechanism,
                                          boolean saslHandshakeRequestEnable,
-                                         CredentialCache credentialCache) {
+                                         CredentialCache credentialCache,
+                                         DelegationTokenCache tokenCache) {
         Map<String, ?> configs;
         if (listenerName == null)
             configs = config.values();
@@ -102,7 +105,7 @@ public class ChannelBuilders {
                 requireNonNullMode(mode, securityProtocol);
                 JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
                 channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, listenerName,
-                        clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
+                        clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache);
                 break;
             case PLAINTEXT:
                 channelBuilder = new PlaintextChannelBuilder();
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 72a2bc1..d6dd5fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -22,11 +22,12 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.authenticator.LoginManager;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
-import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.Java;
 import org.slf4j.Logger;
@@ -53,6 +54,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
     private final JaasContext jaasContext;
     private final boolean handshakeRequestEnable;
     private final CredentialCache credentialCache;
+    private final DelegationTokenCache tokenCache;
 
     private LoginManager loginManager;
     private SslFactory sslFactory;
@@ -65,7 +67,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
                               ListenerName listenerName,
                               String clientSaslMechanism,
                               boolean handshakeRequestEnable,
-                              CredentialCache credentialCache) {
+                              CredentialCache credentialCache,
+                              DelegationTokenCache tokenCache) {
         this.mode = mode;
         this.jaasContext = jaasContext;
         this.securityProtocol = securityProtocol;
@@ -73,6 +76,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
         this.handshakeRequestEnable = handshakeRequestEnable;
         this.clientSaslMechanism = clientSaslMechanism;
         this.credentialCache = credentialCache;
+        this.tokenCache = tokenCache;
     }
 
     @Override
@@ -153,7 +157,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
     protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
             TransportLayer transportLayer, Subject subject) throws IOException {
         return new SaslServerAuthenticator(configs, id, jaasContext, subject,
-                kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer);
+                kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer, tokenCache);
     }
 
     // Visible to override for testing
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index cf1bff5..b408e80 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
@@ -53,8 +55,12 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
 import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
@@ -83,6 +89,8 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
 import org.apache.kafka.common.requests.SaslAuthenticateRequest;
 import org.apache.kafka.common.requests.SaslAuthenticateResponse;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
@@ -171,7 +179,11 @@ public enum ApiKeys {
     SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequest.schemaVersions(),
             SaslAuthenticateResponse.schemaVersions()),
     CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
-            CreatePartitionsResponse.schemaVersions());
+            CreatePartitionsResponse.schemaVersions()),
+    CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()),
+    RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
+    EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
+    DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions());
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index e1d1884..a436dff 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -50,4 +50,8 @@ public class CommonFields {
     public static final Field.NullableStr HOST_FILTER = new Field.NullableStr("host", "The ACL host filter");
     public static final Field.Int8 OPERATION = new Field.Int8("operation", "The ACL operation");
     public static final Field.Int8 PERMISSION_TYPE = new Field.Int8("permission_type", "The ACL permission type");
+
+    public static final Field.Str PRINCIPAL_TYPE = new Field.Str("principal_type", "principalType of the Kafka principal");
+    public static final Field.Str PRINCIPAL_NAME = new Field.Str("name", "name of the Kafka principal");
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d937054..bd5b800 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -27,6 +28,9 @@ import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.DuplicateSequenceException;
+import org.apache.kafka.common.errors.DelegationTokenDisabledException;
+import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
+import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
@@ -68,6 +72,8 @@ import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
+import org.apache.kafka.common.errors.DelegationTokenExpiredException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
@@ -76,6 +82,7 @@ import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -545,6 +552,48 @@ public enum Errors {
             public ApiException build(String message) {
                 return new ReassignmentInProgressException(message);
             }
+        }),
+    DELEGATION_TOKEN_AUTH_DISABLED(61, "Delegation Token feature is not enabled.", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new DelegationTokenDisabledException(message);
+        }
+    }),
+    DELEGATION_TOKEN_NOT_FOUND(62, "Delegation Token is not found on server.", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new DelegationTokenNotFoundException(message);
+        }
+    }),
+    DELEGATION_TOKEN_OWNER_MISMATCH(63, "Specified Principal is not valid Owner/Renewer.", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new DelegationTokenOwnerMismatchException(message);
+        }
+    }),
+    DELEGATION_TOKEN_REQUEST_NOT_ALLOWED(64, "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and " + "on delegation token authenticated channels.", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new UnsupportedByAuthenticationException(message);
+        }
+    }),
+    DELEGATION_TOKEN_AUTHORIZATION_FAILED(65, "Delegation Token authorization failed.", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new DelegationTokenAuthorizationException(message);
+        }
+    }),
+    DELEGATION_TOKEN_EXPIRED(66, "Delegation Token is expired.", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new DelegationTokenExpiredException(message);
+        }
+    }),
+    INVALID_PRINCIPAL_TYPE(67, "Supplied principalType is not supported", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new InvalidPrincipalTypeException(message);
+        }
     });
 
     private interface ApiExceptionBuilder {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5a1c4f4..cd213d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -214,6 +214,14 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new SaslAuthenticateRequest(struct, apiVersion);
             case CREATE_PARTITIONS:
                 return new CreatePartitionsRequest(struct, apiVersion);
+            case CREATE_DELEGATION_TOKEN:
+                return new CreateDelegationTokenRequest(struct, apiVersion);
+            case RENEW_DELEGATION_TOKEN:
+                return new RenewDelegationTokenRequest(struct, apiVersion);
+            case EXPIRE_DELEGATION_TOKEN:
+                return new ExpireDelegationTokenRequest(struct, apiVersion);
+            case DESCRIBE_DELEGATION_TOKEN:
+                return new DescribeDelegationTokenRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 6294af4..fb01298 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -146,6 +146,14 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new SaslAuthenticateResponse(struct);
             case CREATE_PARTITIONS:
                 return new CreatePartitionsResponse(struct);
+            case CREATE_DELEGATION_TOKEN:
+                return new CreateDelegationTokenResponse(struct);
+            case RENEW_DELEGATION_TOKEN:
+                return new RenewDelegationTokenResponse(struct);
+            case EXPIRE_DELEGATION_TOKEN:
+                return new ExpireDelegationTokenResponse(struct);
+            case DESCRIBE_DELEGATION_TOKEN:
+                return new DescribeDelegationTokenResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
new file mode 100644
index 0000000..7c48205
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
+
+public class CreateDelegationTokenRequest extends AbstractRequest {
+    private static final String RENEWERS_KEY_NAME = "renewers";
+    private static final String MAX_LIFE_TIME_KEY_NAME = "max_life_time";
+
+    private static final Schema TOKEN_CREATE_REQUEST_V0 = new Schema(
+            new Field(RENEWERS_KEY_NAME, new ArrayOf(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)),
+                    "An array of token renewers. Renewer is an Kafka PrincipalType and name string," +
+                        " who is allowed to renew this token before the max lifetime expires."),
+            new Field(MAX_LIFE_TIME_KEY_NAME, INT64,
+                    "Max lifetime period for token in milli seconds. if value is -1, then max lifetime" +
+                        "  will default to a server side config value."));
+
+    private final List<KafkaPrincipal> renewers;
+    private final long maxLifeTime;
+
+    private CreateDelegationTokenRequest(short version, List<KafkaPrincipal> renewers, long maxLifeTime) {
+        super(version);
+        this.maxLifeTime = maxLifeTime;
+        this.renewers = renewers;
+    }
+
+    public CreateDelegationTokenRequest(Struct struct, short version) {
+        super(version);
+        maxLifeTime = struct.getLong(MAX_LIFE_TIME_KEY_NAME);
+        Object[] renewerArray = struct.getArray(RENEWERS_KEY_NAME);
+        renewers = new ArrayList<>();
+        if (renewerArray != null) {
+            for (Object renewerObj : renewerArray) {
+                Struct renewerObjStruct = (Struct) renewerObj;
+                String principalType = renewerObjStruct.get(PRINCIPAL_TYPE);
+                String principalName = renewerObjStruct.get(PRINCIPAL_NAME);
+                renewers.add(new KafkaPrincipal(principalType, principalName));
+            }
+        }
+    }
+
+    public static CreateDelegationTokenRequest parse(ByteBuffer buffer, short version) {
+        return new CreateDelegationTokenRequest(ApiKeys.CREATE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
+    }
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{TOKEN_CREATE_REQUEST_V0};
+    }
+
+    @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.requestSchema(version));
+        Object[] renewersArray = new Object[renewers.size()];
+
+        int i = 0;
+        for (KafkaPrincipal principal: renewers) {
+            Struct renewerStruct = struct.instance(RENEWERS_KEY_NAME);
+            renewerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType());
+            renewerStruct.set(PRINCIPAL_NAME, principal.getName());
+            renewersArray[i++] = renewerStruct;
+        }
+
+        struct.set(RENEWERS_KEY_NAME, renewersArray);
+        struct.set(MAX_LIFE_TIME_KEY_NAME, maxLifeTime);
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new CreateDelegationTokenResponse(throttleTimeMs, Errors.forException(e), KafkaPrincipal.ANONYMOUS);
+    }
+
+    public List<KafkaPrincipal> renewers() {
+        return renewers;
+    }
+
+    public long maxLifeTime() {
+        return maxLifeTime;
+    }
+
+    public static class Builder extends AbstractRequest.Builder<CreateDelegationTokenRequest> {
+        private final List<KafkaPrincipal> renewers;
+        private final long maxLifeTime;
+
+        public Builder(List<KafkaPrincipal> renewers, long maxLifeTime) {
+            super(ApiKeys.CREATE_DELEGATION_TOKEN);
+            this.renewers = renewers;
+            this.maxLifeTime = maxLifeTime;
+        }
+
+        @Override
+        public CreateDelegationTokenRequest build(short version) {
+            return new CreateDelegationTokenRequest(version, renewers, maxLifeTime);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type: CreateDelegationTokenRequest").
+                append(", renewers=").append(renewers).
+                append(", maxLifeTime=").append(maxLifeTime).
+                append(")");
+            return bld.toString();
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
new file mode 100644
index 0000000..c718cf0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
+public class CreateDelegationTokenResponse extends AbstractResponse {
+
+    private static final String OWNER_KEY_NAME = "owner";
+    private static final String ISSUE_TIMESTAMP_KEY_NAME = "issue_timestamp";
+    private static final String EXPIRY_TIMESTAMP_NAME = "expiry_timestamp";
+    private static final String MAX_TIMESTAMP_NAME = "max_timestamp";
+    private static final String TOKEN_ID_KEY_NAME = "token_id";
+    private static final String HMAC_KEY_NAME = "hmac";
+
+    private final Errors error;
+    private final long issueTimestamp;
+    private final long expiryTimestamp;
+    private final long maxTimestamp;
+    private final String tokenId;
+    private final ByteBuffer hmac;
+    private final int throttleTimeMs;
+    private KafkaPrincipal owner;
+
+    private static final Schema TOKEN_CREATE_RESPONSE_V0 = new Schema(
+        ERROR_CODE,
+        new Field(OWNER_KEY_NAME, new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME), "token owner."),
+        new Field(ISSUE_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) when this token was generated."),
+        new Field(EXPIRY_TIMESTAMP_NAME, INT64, "timestamp (in msec) at which this token expires."),
+        new Field(MAX_TIMESTAMP_NAME, INT64, "max life time of this token."),
+        new Field(TOKEN_ID_KEY_NAME, STRING, "UUID to ensure uniqueness."),
+        new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token."),
+        THROTTLE_TIME_MS);
+
+    public CreateDelegationTokenResponse(int throttleTimeMs,
+                                         Errors error,
+                                         KafkaPrincipal owner,
+                                         long issueTimestamp,
+                                         long expiryTimestamp,
+                                         long maxTimestamp,
+                                         String tokenId,
+                                         ByteBuffer hmac) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.error = error;
+        this.owner = owner;
+        this.issueTimestamp = issueTimestamp;
+        this.expiryTimestamp = expiryTimestamp;
+        this.maxTimestamp = maxTimestamp;
+        this.tokenId = tokenId;
+        this.hmac = hmac;
+    }
+
+    public CreateDelegationTokenResponse(int throttleTimeMs, Errors error, KafkaPrincipal owner) {
+        this(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
+    }
+
+    public CreateDelegationTokenResponse(Struct struct) {
+        error = Errors.forCode(struct.get(ERROR_CODE));
+        Struct ownerStruct = (Struct) struct.get(OWNER_KEY_NAME);
+        String principalType = ownerStruct.get(PRINCIPAL_TYPE);
+        String principalName = ownerStruct.get(PRINCIPAL_NAME);
+        owner = new KafkaPrincipal(principalType, principalName);
+        issueTimestamp = struct.getLong(ISSUE_TIMESTAMP_KEY_NAME);
+        expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_NAME);
+        maxTimestamp = struct.getLong(MAX_TIMESTAMP_NAME);
+        tokenId = struct.getString(TOKEN_ID_KEY_NAME);
+        hmac = struct.getBytes(HMAC_KEY_NAME);
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+    }
+
+    public static CreateDelegationTokenResponse parse(ByteBuffer buffer, short version) {
+        return new CreateDelegationTokenResponse(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
+    }
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {TOKEN_CREATE_RESPONSE_V0};
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version));
+        struct.set(ERROR_CODE, error.code());
+        Struct ownerStruct = struct.instance(OWNER_KEY_NAME);
+        ownerStruct.set(PRINCIPAL_TYPE, owner.getPrincipalType());
+        ownerStruct.set(PRINCIPAL_NAME, owner.getName());
+        struct.set(OWNER_KEY_NAME, ownerStruct);
+        struct.set(ISSUE_TIMESTAMP_KEY_NAME, issueTimestamp);
+        struct.set(EXPIRY_TIMESTAMP_NAME, expiryTimestamp);
+        struct.set(MAX_TIMESTAMP_NAME, maxTimestamp);
+        struct.set(TOKEN_ID_KEY_NAME, tokenId);
+        struct.set(HMAC_KEY_NAME, hmac);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+        return struct;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public KafkaPrincipal owner() {
+        return owner;
+    }
+
+    public long issueTimestamp() {
+        return issueTimestamp;
+    }
+
+    public long expiryTimestamp() {
+        return expiryTimestamp;
+    }
+
+    public long maxTimestamp() {
+        return maxTimestamp;
+    }
+
+    public String tokenId() {
+        return tokenId;
+    }
+
+    public byte[] hmacBytes() {
+        byte[] byteArray = new byte[hmac.remaining()];
+        hmac.get(byteArray);
+        return byteArray;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java
new file mode 100644
index 0000000..61c0055
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
+
+public class DescribeDelegationTokenRequest extends AbstractRequest {
+    private static final String OWNER_KEY_NAME = "owners";
+
+    private final List<KafkaPrincipal> owners;
+
+    public static final Schema TOKEN_DESCRIBE_REQUEST_V0 = new Schema(
+        new Field(OWNER_KEY_NAME, ArrayOf.nullable(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)), "An array of token owners."));
+
+    public static class Builder extends AbstractRequest.Builder<DescribeDelegationTokenRequest> {
+        // describe token for the given list of owners, or null if we want to describe all tokens.
+        private final List<KafkaPrincipal> owners;
+
+        public Builder(List<KafkaPrincipal> owners) {
+            super(ApiKeys.DESCRIBE_DELEGATION_TOKEN);
+            this.owners = owners;
+        }
+
+        @Override
+        public DescribeDelegationTokenRequest build(short version) {
+            return new DescribeDelegationTokenRequest(version, owners);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type: DescribeDelegationTokenRequest").
+                append(", owners=").append(owners).
+                append(")");
+            return bld.toString();
+        }
+    }
+
+    private DescribeDelegationTokenRequest(short version, List<KafkaPrincipal> owners) {
+        super(version);
+        this.owners = owners;
+    }
+
+    public DescribeDelegationTokenRequest(Struct struct, short versionId) {
+        super(versionId);
+
+        Object[] ownerArray = struct.getArray(OWNER_KEY_NAME);
+
+        if (ownerArray != null) {
+            owners = new ArrayList<>();
+            for (Object ownerObj : ownerArray) {
+                Struct ownerObjStruct = (Struct) ownerObj;
+                String principalType = ownerObjStruct.get(PRINCIPAL_TYPE);
+                String principalName = ownerObjStruct.get(PRINCIPAL_NAME);
+                owners.add(new KafkaPrincipal(principalType, principalName));
+            }
+        } else
+            owners = null;
+    }
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{TOKEN_DESCRIBE_REQUEST_V0};
+    }
+
+    @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new Struct(ApiKeys.DESCRIBE_DELEGATION_TOKEN.requestSchema(version));
+
+        if (owners == null) {
+            struct.set(OWNER_KEY_NAME, null);
+        } else {
+            Object[] ownersArray = new Object[owners.size()];
+
+            int i = 0;
+            for (KafkaPrincipal principal: owners) {
+                Struct ownerStruct = struct.instance(OWNER_KEY_NAME);
+                ownerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType());
+                ownerStruct.set(PRINCIPAL_NAME, principal.getName());
+                ownersArray[i++] = ownerStruct;
+            }
+
+            struct.set(OWNER_KEY_NAME, ownersArray);
+        }
+
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new DescribeDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
+    }
+
+    public List<KafkaPrincipal> owners() {
+        return owners;
+    }
+
+    public boolean ownersListEmpty() {
+        return owners != null && owners.isEmpty();
+    }
+
+    public static DescribeDelegationTokenRequest parse(ByteBuffer buffer, short version) {
+        return new DescribeDelegationTokenRequest(ApiKeys.DESCRIBE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
new file mode 100644
index 0000000..dba29ea
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
+public class DescribeDelegationTokenResponse extends AbstractResponse {
+
+    private static final String TOKEN_DETAILS_KEY_NAME = "token_details";
+    private static final String ISSUE_TIMESTAMP_KEY_NAME = "issue_timestamp";
+    private static final String EXPIRY_TIMESTAMP_NAME = "expiry_timestamp";
+    private static final String MAX_TIMESTAMP_NAME = "max_timestamp";
+    private static final String TOKEN_ID_KEY_NAME = "token_id";
+    private static final String HMAC_KEY_NAME = "hmac";
+    private static final String OWNER_KEY_NAME = "owner";
+    private static final String RENEWERS_KEY_NAME = "renewers";
+
+    private final Errors error;
+    private final List<DelegationToken> tokens;
+    private final int throttleTimeMs;
+
+    private static final Schema TOKEN_DETAILS_V0 = new Schema(
+        new Field(OWNER_KEY_NAME, new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME), "token owner."),
+        new Field(ISSUE_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) when this token was generated."),
+        new Field(EXPIRY_TIMESTAMP_NAME, INT64, "timestamp (in msec) at which this token expires."),
+        new Field(MAX_TIMESTAMP_NAME, INT64, "max life time of this token."),
+        new Field(TOKEN_ID_KEY_NAME, STRING, "UUID to ensure uniqueness."),
+        new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."),
+        new Field(RENEWERS_KEY_NAME, new ArrayOf(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)),
+            "An array of token renewers. Renewer is an Kafka PrincipalType and name string," +
+                " who is allowed to renew this token before the max lifetime expires."));
+
+    private static final Schema TOKEN_DESCRIBE_RESPONSE_V0 = new Schema(
+        ERROR_CODE,
+        new Field(TOKEN_DETAILS_KEY_NAME, new ArrayOf(TOKEN_DETAILS_V0)),
+        THROTTLE_TIME_MS);
+
+    public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error, List<DelegationToken> tokens) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.error = error;
+        this.tokens = tokens;
+    }
+
+    public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error) {
+        this(throttleTimeMs, error, new ArrayList<DelegationToken>());
+    }
+
+    public DescribeDelegationTokenResponse(Struct struct) {
+        Object[] requestStructs = struct.getArray(TOKEN_DETAILS_KEY_NAME);
+        List<DelegationToken> tokens = new ArrayList<>();
+
+        for (Object requestStructObj : requestStructs) {
+            Struct singleRequestStruct = (Struct) requestStructObj;
+
+            Struct ownerStruct = (Struct) singleRequestStruct.get(OWNER_KEY_NAME);
+            KafkaPrincipal owner = new KafkaPrincipal(ownerStruct.get(PRINCIPAL_TYPE), ownerStruct.get(PRINCIPAL_NAME));
+            long issueTimestamp = singleRequestStruct.getLong(ISSUE_TIMESTAMP_KEY_NAME);
+            long expiryTimestamp = singleRequestStruct.getLong(EXPIRY_TIMESTAMP_NAME);
+            long maxTimestamp = singleRequestStruct.getLong(MAX_TIMESTAMP_NAME);
+            String tokenId = singleRequestStruct.getString(TOKEN_ID_KEY_NAME);
+            ByteBuffer hmac = singleRequestStruct.getBytes(HMAC_KEY_NAME);
+
+            Object[] renewerArray = singleRequestStruct.getArray(RENEWERS_KEY_NAME);
+            List<KafkaPrincipal>  renewers = new ArrayList<>();
+            if (renewerArray != null) {
+                for (Object renewerObj : renewerArray) {
+                    Struct renewerObjStruct = (Struct) renewerObj;
+                    String principalType = renewerObjStruct.get(PRINCIPAL_TYPE);
+                    String principalName = renewerObjStruct.get(PRINCIPAL_NAME);
+                    renewers.add(new KafkaPrincipal(principalType, principalName));
+                }
+            }
+
+            TokenInformation tokenInfo = new TokenInformation(tokenId, owner, renewers, issueTimestamp, maxTimestamp, expiryTimestamp);
+
+            byte[] hmacBytes = new byte[hmac.remaining()];
+            hmac.get(hmacBytes);
+
+            DelegationToken tokenDetails = new DelegationToken(tokenInfo, hmacBytes);
+            tokens.add(tokenDetails);
+        }
+
+        this.tokens = tokens;
+        error = Errors.forCode(struct.get(ERROR_CODE));
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+    }
+
+    public static DescribeDelegationTokenResponse parse(ByteBuffer buffer, short version) {
+        return new DescribeDelegationTokenResponse(ApiKeys.DESCRIBE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.DESCRIBE_DELEGATION_TOKEN.responseSchema(version));
+        List<Struct> tokenDetailsStructs = new ArrayList<>(tokens.size());
+
+        struct.set(ERROR_CODE, error.code());
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            Struct singleRequestStruct = struct.instance(TOKEN_DETAILS_KEY_NAME);
+            Struct ownerStruct = singleRequestStruct.instance(OWNER_KEY_NAME);
+            ownerStruct.set(PRINCIPAL_TYPE, tokenInfo.owner().getPrincipalType());
+            ownerStruct.set(PRINCIPAL_NAME, tokenInfo.owner().getName());
+            singleRequestStruct.set(OWNER_KEY_NAME, ownerStruct);
+            singleRequestStruct.set(ISSUE_TIMESTAMP_KEY_NAME, tokenInfo.issueTimestamp());
+            singleRequestStruct.set(EXPIRY_TIMESTAMP_NAME, tokenInfo.expiryTimestamp());
+            singleRequestStruct.set(MAX_TIMESTAMP_NAME, tokenInfo.maxTimestamp());
+            singleRequestStruct.set(TOKEN_ID_KEY_NAME, tokenInfo.tokenId());
+            singleRequestStruct.set(HMAC_KEY_NAME, ByteBuffer.wrap(token.hmac()));
+
+            Object[] renewersArray = new Object[tokenInfo.renewers().size()];
+
+            int i = 0;
+            for (KafkaPrincipal principal: tokenInfo.renewers()) {
+                Struct renewerStruct = singleRequestStruct.instance(RENEWERS_KEY_NAME);
+                renewerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType());
+                renewerStruct.set(PRINCIPAL_NAME, principal.getName());
+                renewersArray[i++] = renewerStruct;
+            }
+
+            singleRequestStruct.set(RENEWERS_KEY_NAME, renewersArray);
+
+            tokenDetailsStructs.add(singleRequestStruct);
+        }
+        struct.set(TOKEN_DETAILS_KEY_NAME, tokenDetailsStructs.toArray());
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+
+        return struct;
+    }
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{TOKEN_DESCRIBE_RESPONSE_V0};
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public List<DelegationToken> tokens() {
+        return tokens;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
new file mode 100644
index 0000000..0d43440
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
+public class ExpireDelegationTokenRequest extends AbstractRequest {
+
+    private static final String HMAC_KEY_NAME = "hmac";
+    private static final String EXPIRY_TIME_PERIOD_KEY_NAME = "expiry_time_period";
+    private final ByteBuffer hmac;
+    private final long expiryTimePeriod;
+
+    private static final Schema TOKEN_EXPIRE_REQUEST_V0 = new Schema(
+        new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."),
+        new Field(EXPIRY_TIME_PERIOD_KEY_NAME, INT64, "expiry time period in milli seconds."));
+
+    private ExpireDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
+        super(version);
+
+        this.hmac = hmac;
+        this.expiryTimePeriod = renewTimePeriod;
+    }
+
+    public ExpireDelegationTokenRequest(Struct struct, short versionId) {
+        super(versionId);
+
+        hmac = struct.getBytes(HMAC_KEY_NAME);
+        expiryTimePeriod = struct.getLong(EXPIRY_TIME_PERIOD_KEY_NAME);
+    }
+
+    public static ExpireDelegationTokenRequest parse(ByteBuffer buffer, short version) {
+        return new ExpireDelegationTokenRequest(ApiKeys.EXPIRE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
+    }
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {TOKEN_EXPIRE_REQUEST_V0};
+    }
+
+    @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.requestSchema(version));
+
+        struct.set(HMAC_KEY_NAME, hmac);
+        struct.set(EXPIRY_TIME_PERIOD_KEY_NAME, expiryTimePeriod);
+
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new ExpireDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
+    }
+
+    public ByteBuffer hmac() {
+        return hmac;
+    }
+
+    public long expiryTimePeriod() {
+        return expiryTimePeriod;
+    }
+
+    public static class Builder extends AbstractRequest.Builder<ExpireDelegationTokenRequest> {
+        private final ByteBuffer hmac;
+        private final long expiryTimePeriod;
+
+        public Builder(ByteBuffer hmac, long expiryTimePeriod) {
+            super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
+            this.hmac = hmac;
+            this.expiryTimePeriod = expiryTimePeriod;
+        }
+
+        @Override
+        public ExpireDelegationTokenRequest build(short version) {
+            return new ExpireDelegationTokenRequest(version, hmac, expiryTimePeriod);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type: ExpireDelegationTokenRequest").
+                append(", hmac=").append(hmac).
+                append(", expiryTimePeriod=").append(expiryTimePeriod).
+                append(")");
+            return bld.toString();
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
new file mode 100644
index 0000000..f7e0ec4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
+public class ExpireDelegationTokenResponse extends AbstractResponse {
+
+    private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp";
+
+    private final Errors error;
+    private final long expiryTimestamp;
+    private final int throttleTimeMs;
+
+    private  static final Schema TOKEN_EXPIRE_RESPONSE_V0 = new Schema(
+        ERROR_CODE,
+        new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
+        THROTTLE_TIME_MS);
+
+    public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.error = error;
+        this.expiryTimestamp = expiryTimestamp;
+    }
+
+    public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error) {
+        this(throttleTimeMs, error, -1);
+    }
+
+    public ExpireDelegationTokenResponse(Struct struct) {
+        error = Errors.forCode(struct.get(ERROR_CODE));
+        this.expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME);
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+    }
+
+    public static ExpireDelegationTokenResponse parse(ByteBuffer buffer, short version) {
+        return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
+    }
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {TOKEN_EXPIRE_RESPONSE_V0};
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public long expiryTimestamp() {
+        return expiryTimestamp;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version));
+
+        struct.set(ERROR_CODE, error.code());
+        struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
new file mode 100644
index 0000000..4a4b762
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
+public class RenewDelegationTokenRequest extends AbstractRequest {
+
+    private static final String HMAC_KEY_NAME = "hmac";
+    private static final String RENEW_TIME_PERIOD_KEY_NAME = "renew_time_period";
+    private final ByteBuffer hmac;
+    private final long renewTimePeriod;
+
+    public static final Schema TOKEN_RENEW_REQUEST_V0 = new Schema(
+        new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be renewed."),
+        new Field(RENEW_TIME_PERIOD_KEY_NAME, INT64, "Renew time period in milli seconds."));
+
+    private RenewDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
+        super(version);
+
+        this.hmac = hmac;
+        this.renewTimePeriod = renewTimePeriod;
+    }
+
+    public RenewDelegationTokenRequest(Struct struct, short versionId) {
+        super(versionId);
+
+        hmac = struct.getBytes(HMAC_KEY_NAME);
+        renewTimePeriod = struct.getLong(RENEW_TIME_PERIOD_KEY_NAME);
+    }
+
+    public static RenewDelegationTokenRequest parse(ByteBuffer buffer, short version) {
+        return new RenewDelegationTokenRequest(ApiKeys.RENEW_DELEGATION_TOKEN.parseRequest(version, buffer), version);
+    }
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {TOKEN_RENEW_REQUEST_V0};
+    }
+
+    @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new Struct(ApiKeys.RENEW_DELEGATION_TOKEN.requestSchema(version));
+
+        struct.set(HMAC_KEY_NAME, hmac);
+        struct.set(RENEW_TIME_PERIOD_KEY_NAME, renewTimePeriod);
+
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new RenewDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
+    }
+
+    public ByteBuffer hmac() {
+        return hmac;
+    }
+
+    public long renewTimePeriod() {
+        return renewTimePeriod;
+    }
+
+    public static class Builder extends AbstractRequest.Builder<RenewDelegationTokenRequest> {
+        private final ByteBuffer hmac;
+        private final long renewTimePeriod;
+
+        public Builder(ByteBuffer hmac, long renewTimePeriod) {
+            super(ApiKeys.RENEW_DELEGATION_TOKEN);
+            this.hmac = hmac;
+            this.renewTimePeriod = renewTimePeriod;
+        }
+
+        @Override
+        public RenewDelegationTokenRequest build(short version) {
+            return new RenewDelegationTokenRequest(version, hmac, renewTimePeriod);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type: RenewDelegationTokenRequest").
+                append(", hmac=").append(hmac).
+                append(", renewTimePeriod=").append(renewTimePeriod).
+                append(")");
+            return bld.toString();
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
new file mode 100644
index 0000000..1885b48
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
+public class RenewDelegationTokenResponse extends AbstractResponse {
+
+    private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp";
+
+    private final Errors error;
+    private final long expiryTimestamp;
+    private final int throttleTimeMs;
+
+    private static final Schema TOKEN_RENEW_RESPONSE_V0 = new Schema(
+            ERROR_CODE,
+            new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
+            THROTTLE_TIME_MS);
+
+    public RenewDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.error = error;
+        this.expiryTimestamp = expiryTimestamp;
+    }
+
+    public RenewDelegationTokenResponse(int throttleTimeMs, Errors error) {
+        this(throttleTimeMs, error, -1);
+    }
+
+    public RenewDelegationTokenResponse(Struct struct) {
+        error = Errors.forCode(struct.get(ERROR_CODE));
+        expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME);
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+    }
+
+    public static RenewDelegationTokenResponse parse(ByteBuffer buffer, short version) {
+        return new RenewDelegationTokenResponse(ApiKeys.RENEW_DELEGATION_TOKEN.responseSchema(version).read(buffer));
+    }
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {TOKEN_RENEW_RESPONSE_V0};
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.RENEW_DELEGATION_TOKEN.responseSchema(version));
+
+        struct.set(ERROR_CODE, error.code());
+        struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public long expiryTimestamp() {
+        return expiryTimestamp;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
index d83382d..2ce653f 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
@@ -58,7 +58,12 @@ public enum ResourceType {
     /**
      * A transactional ID.
      */
-    TRANSACTIONAL_ID((byte) 5);
+    TRANSACTIONAL_ID((byte) 5),
+
+    /**
+     * A token ID.
+     */
+    DELEGATION_TOKEN((byte) 6);
 
     private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>();
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index ed3c956..10bf76d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -47,10 +47,16 @@ public class KafkaPrincipal implements Principal {
 
     private final String principalType;
     private final String name;
+    private boolean tokenAuthenticated;
 
     public KafkaPrincipal(String principalType, String name) {
+       this(principalType, name, false);
+    }
+
+    public KafkaPrincipal(String principalType, String name, boolean tokenauth) {
         this.principalType = requireNonNull(principalType, "Principal type cannot be null");
         this.name = requireNonNull(name, "Principal name cannot be null");
+        this.tokenAuthenticated =  tokenauth;
     }
 
     /**
@@ -83,8 +89,9 @@ public class KafkaPrincipal implements Principal {
 
     @Override
     public int hashCode() {
-        int result = principalType.hashCode();
-        result = 31 * result + name.hashCode();
+        int result = principalType != null ? principalType.hashCode() : 0;
+        result = 31 * result + (name != null ? name.hashCode() : 0);
+        result = 31 * result + (tokenAuthenticated ? 1 : 0);
         return result;
     }
 
@@ -97,4 +104,8 @@ public class KafkaPrincipal implements Principal {
         return principalType;
     }
 
+    public boolean tokenAuthenticated() {
+        return tokenAuthenticated;
+    }
 }
+
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
index 30b0a3e..7404238 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
 import org.apache.kafka.common.security.auth.SslAuthenticationContext;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
@@ -118,8 +119,13 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
             SaslServer saslServer = ((SaslAuthenticationContext) context).server();
             if (SaslConfigs.GSSAPI_MECHANISM.equals(saslServer.getMechanismName()))
                 return applyKerberosShortNamer(saslServer.getAuthorizationID());
-            else
-                return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
+            else {
+                Boolean isTokenAuthenticated = (Boolean) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG);
+                if (isTokenAuthenticated != null && isTokenAuthenticated)
+                    return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(), true);
+                else
+                    return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
+            }
         } else {
             throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 4756387..de96cef 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -28,6 +28,7 @@ import javax.security.sasl.RealmCallback;
 
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.scram.DelegationTokenAuthenticationCallback;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL mechanism
@@ -80,7 +81,12 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
                 ac.setAuthorized(authId.equals(authzId));
                 if (ac.isAuthorized())
                     ac.setAuthorizedID(authzId);
-            } else {
+            } else if (callback instanceof DelegationTokenAuthenticationCallback) {
+                DelegationTokenAuthenticationCallback tc = (DelegationTokenAuthenticationCallback) callback;
+                if (!isKerberos && subject != null && !subject.getPublicCredentials(Boolean.class).isEmpty()) {
+                    tc.tokenauth(subject.getPublicCredentials(Boolean.class).iterator().next());
+                }
+            }  else {
                 throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 1a2aaf4..6fcca57 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -56,6 +56,7 @@ import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.apache.kafka.common.security.scram.ScramServerCallbackHandler;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSCredential;
 import org.ietf.jgss.GSSException;
@@ -107,6 +108,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private final Set<String> enabledMechanisms;
     private final Map<String, ?> configs;
     private final KafkaPrincipalBuilder principalBuilder;
+    private final DelegationTokenCache tokenCache;
 
     // Current SASL state
     private SaslState saslState = SaslState.INITIAL_REQUEST;
@@ -132,7 +134,8 @@ public class SaslServerAuthenticator implements Authenticator {
                                    CredentialCache credentialCache,
                                    ListenerName listenerName,
                                    SecurityProtocol securityProtocol,
-                                   TransportLayer transportLayer) throws IOException {
+                                   TransportLayer transportLayer,
+                                   DelegationTokenCache tokenCache) throws IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
         this.connectionId = connectionId;
@@ -142,7 +145,7 @@ public class SaslServerAuthenticator implements Authenticator {
         this.listenerName = listenerName;
         this.securityProtocol = securityProtocol;
         this.enableKafkaSaslAuthenticateHeaders = false;
-
+        this.tokenCache = tokenCache;
         this.transportLayer = transportLayer;
 
         this.configs = configs;
@@ -162,7 +165,7 @@ public class SaslServerAuthenticator implements Authenticator {
         if (!ScramMechanism.isScram(mechanism))
             callbackHandler = new SaslServerCallbackHandler(jaasContext);
         else
-            callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class));
+            callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class), tokenCache);
         callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
         if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
             saslServer = createSaslKerberosServer(callbackHandler, configs, subject);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
similarity index 74%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
index 931210a..df6e849 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
@@ -14,18 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.common.security.scram;
 
 import javax.security.auth.callback.Callback;
 
-public class ScramCredentialCallback implements Callback {
-    private ScramCredential scramCredential;
+public class DelegationTokenAuthenticationCallback implements Callback {
+    private boolean tokenauth;
 
-    public ScramCredential scramCredential() {
-        return scramCredential;
+    public String extension() {
+        return ScramLoginModule.TOKEN_AUTH_CONFIG + "=" +  Boolean.toString(tokenauth);
     }
 
-    public void scramCredential(ScramCredential scramCredential) {
-        this.scramCredential = scramCredential;
+    public void tokenauth(Boolean tokenauth) {
+        this.tokenauth = tokenauth;
     }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
index 931210a..7f3601c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
@@ -20,12 +20,36 @@ import javax.security.auth.callback.Callback;
 
 public class ScramCredentialCallback implements Callback {
     private ScramCredential scramCredential;
+    private boolean tokenAuthenticated;
+    private String tokenOwner;
+    private String mechanism;
+
+    public ScramCredentialCallback(boolean tokenAuthenticated, String mechanism) {
+        this.tokenAuthenticated = tokenAuthenticated;
+        this.mechanism = mechanism;
+    }
 
     public ScramCredential scramCredential() {
         return scramCredential;
     }
 
+    public boolean tokenauth() {
+        return tokenAuthenticated;
+    }
+
     public void scramCredential(ScramCredential scramCredential) {
         this.scramCredential = scramCredential;
     }
+
+    public void tokenOwner(String tokenOwner) {
+        this.tokenOwner = tokenOwner;
+    }
+
+    public String tokenOwner() {
+        return tokenOwner;
+    }
+
+    public String mechanism() {
+        return mechanism;
+    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
index c087a32..8000f4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -27,6 +27,7 @@ public class ScramLoginModule implements LoginModule {
 
     private static final String USERNAME_CONFIG = "username";
     private static final String PASSWORD_CONFIG = "password";
+    public static final String TOKEN_AUTH_CONFIG = "tokenauth";
 
     static {
         ScramSaslClientProvider.initialize();
@@ -41,6 +42,9 @@ public class ScramLoginModule implements LoginModule {
         String password = (String) options.get(PASSWORD_CONFIG);
         if (password != null)
             subject.getPrivateCredentials().add(password);
+
+        Boolean useTokenAuthentication = "true".equalsIgnoreCase((String) options.get(TOKEN_AUTH_CONFIG));
+        subject.getPublicCredentials().add(useTokenAuthentication);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
index 1ad7266..e697ea5 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.security.scram;
 import org.apache.kafka.common.utils.Base64;
 
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -64,16 +66,18 @@ public class ScramMessages {
      */
     public static class ClientFirstMessage extends AbstractScramMessage {
         private static final Pattern PATTERN = Pattern.compile(String.format(
-                "n,(a=(?<authzid>%s))?,%sn=(?<saslname>%s),r=(?<nonce>%s)%s",
+                "n,(a=(?<authzid>%s))?,%sn=(?<saslname>%s),r=(?<nonce>%s)(?<extensions>%s)",
                 SASLNAME,
                 RESERVED,
                 SASLNAME,
                 PRINTABLE,
                 EXTENSIONS));
 
+
         private final String saslName;
         private final String nonce;
         private final String authorizationId;
+        private final String extensions;
         public ClientFirstMessage(byte[] messageBytes) throws SaslException {
             String message = toMessage(messageBytes);
             Matcher matcher = PATTERN.matcher(message);
@@ -83,10 +87,13 @@ public class ScramMessages {
             this.authorizationId = authzid != null ? authzid : "";
             this.saslName = matcher.group("saslname");
             this.nonce = matcher.group("nonce");
+            String extString = matcher.group("extensions");
+            this.extensions = extString.startsWith(",") ? extString.substring(1) : extString;
         }
-        public ClientFirstMessage(String saslName, String nonce) {
+        public ClientFirstMessage(String saslName, String nonce, String extensions) {
             this.saslName = saslName;
             this.nonce = nonce;
+            this.extensions = extensions;
             this.authorizationId = ""; // Optional authzid not specified in gs2-header
         }
         public String saslName() {
@@ -101,8 +108,29 @@ public class ScramMessages {
         public String gs2Header() {
             return "n," + authorizationId + ",";
         }
+        public String extensions() {
+            return extensions;
+        }
+
+        public Map<String, String> extensionsAsMap() {
+            Map<String, String> map = new HashMap<>();
+
+            if (extensions.isEmpty())
+                return map;
+
+            String[] attrvals = extensions.split(",");
+            for (String attrval: attrvals) {
+                String[] array = attrval.split("=", 2);
+                map.put(array[0], array[1]);
+            }
+            return map;
+        }
+
         public String clientFirstMessageBare() {
-            return String.format("n=%s,r=%s", saslName, nonce);
+            if (extensions.isEmpty())
+                return String.format("n=%s,r=%s", saslName, nonce);
+            else
+                return String.format("n=%s,r=%s,%s", saslName, nonce, extensions);
         }
         String toMessage() {
             return gs2Header() + clientFirstMessageBare();
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
index e5b0f84..6b66f5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
@@ -97,14 +97,18 @@ public class ScramSaslClient implements SaslClient {
                         throw new SaslException("Expected empty challenge");
                     clientNonce = formatter.secureRandomString();
                     NameCallback nameCallback = new NameCallback("Name:");
+                    DelegationTokenAuthenticationCallback tokenAuthCallback = new DelegationTokenAuthenticationCallback();
+
                     try {
-                        callbackHandler.handle(new Callback[]{nameCallback});
+                        callbackHandler.handle(new Callback[]{nameCallback, tokenAuthCallback});
                     } catch (IOException | UnsupportedCallbackException e) {
                         throw new SaslException("User name could not be obtained", e);
                     }
+
                     String username = nameCallback.getName();
                     String saslName = formatter.saslName(username);
-                    this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce);
+                    String extension = tokenAuthCallback.extension();
+                    this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extension);
                     setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
                     return clientFirstMessage.toBytes();
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
index b250e15..94b92b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
@@ -66,6 +66,8 @@ public class ScramSaslServer implements SaslServer {
     private ClientFirstMessage clientFirstMessage;
     private ServerFirstMessage serverFirstMessage;
     private ScramCredential scramCredential;
+    private boolean tokenAuthentication;
+    private String tokenOwner;
 
     public ScramSaslServer(ScramMechanism mechanism, Map<String, ?> props, CallbackHandler callbackHandler) throws NoSuchAlgorithmException {
         this.mechanism = mechanism;
@@ -93,9 +95,14 @@ public class ScramSaslServer implements SaslServer {
                     try {
                         String saslName = clientFirstMessage.saslName();
                         this.username = formatter.username(saslName);
+                        Map<String, String> extensions = clientFirstMessage.extensionsAsMap();
+                        this.tokenAuthentication = "true".equalsIgnoreCase(extensions.get(ScramLoginModule.TOKEN_AUTH_CONFIG));
                         NameCallback nameCallback = new NameCallback("username", username);
-                        ScramCredentialCallback credentialCallback = new ScramCredentialCallback();
+                        ScramCredentialCallback credentialCallback = new ScramCredentialCallback(tokenAuthentication, getMechanismName());
                         callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
+                        this.tokenOwner = credentialCallback.tokenOwner();
+                        if (tokenAuthentication && tokenOwner == null)
+                            throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
                         this.scramCredential = credentialCallback.scramCredential();
                         if (scramCredential == null)
                             throw new SaslException("Authentication failed: Invalid user credentials");
@@ -143,6 +150,10 @@ public class ScramSaslServer implements SaslServer {
     public String getAuthorizationID() {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
+
+        if (tokenAuthentication)
+            return tokenOwner; // return token owner as principal for this session
+
         return username;
     }
 
@@ -155,7 +166,11 @@ public class ScramSaslServer implements SaslServer {
     public Object getNegotiatedProperty(String propName) {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
-        return null;
+
+        if (ScramLoginModule.TOKEN_AUTH_CONFIG.equals(propName))
+            return tokenAuthentication;
+        else
+            return null;
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
index d3b245d..a064e8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
@@ -27,13 +27,17 @@ import javax.security.auth.callback.UnsupportedCallbackException;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.security.authenticator.AuthCallbackHandler;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
 
 public class ScramServerCallbackHandler implements AuthCallbackHandler {
 
     private final CredentialCache.Cache<ScramCredential> credentialCache;
+    private final DelegationTokenCache tokenCache;
 
-    public ScramServerCallbackHandler(CredentialCache.Cache<ScramCredential> credentialCache) {
+    public ScramServerCallbackHandler(CredentialCache.Cache<ScramCredential> credentialCache,
+                                      DelegationTokenCache tokenCache) {
         this.credentialCache = credentialCache;
+        this.tokenCache = tokenCache;
     }
 
     @Override
@@ -42,9 +46,14 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
         for (Callback callback : callbacks) {
             if (callback instanceof NameCallback)
                 username = ((NameCallback) callback).getDefaultName();
-            else if (callback instanceof ScramCredentialCallback)
-                ((ScramCredentialCallback) callback).scramCredential(credentialCache.get(username));
-            else
+            else if (callback instanceof ScramCredentialCallback) {
+                ScramCredentialCallback sc = (ScramCredentialCallback) callback;
+                if (sc.tokenauth()) {
+                    sc.scramCredential(tokenCache.credential(sc.mechanism(), username));
+                    sc.tokenOwner(tokenCache.owner(username));
+                } else
+                    sc.scramCredential(credentialCache.get(username));
+            } else
                 throw new UnsupportedCallbackException(callback);
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
new file mode 100644
index 0000000..05ccbda
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.token.delegation;
+
+import org.apache.kafka.common.utils.Base64;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class DelegationToken {
+    private TokenInformation tokenInformation;
+    private byte[] hmac;
+
+    public DelegationToken(TokenInformation tokenInformation, byte[] hmac) {
+        this.tokenInformation = tokenInformation;
+        this.hmac = hmac;
+    }
+
+    public TokenInformation tokenInfo() {
+        return tokenInformation;
+    }
+
+    public byte[] hmac() {
+        return hmac;
+    }
+
+    public String hmacAsBase64String() {
+        return Base64.encoder().encodeToString(hmac);
+    }
+
+    public ByteBuffer hmacBuffer() {
+        return ByteBuffer.wrap(hmac);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        DelegationToken token = (DelegationToken) o;
+
+        if (tokenInformation != null ? !tokenInformation.equals(token.tokenInformation) : token.tokenInformation != null) {
+            return false;
+        }
+        return Arrays.equals(hmac, token.hmac);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = tokenInformation != null ? tokenInformation.hashCode() : 0;
+        result = 31 * result + Arrays.hashCode(hmac);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "DelegationToken{" +
+            "tokenInformation=" + tokenInformation +
+            ", hmac=[*******]" +
+            '}';
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
new file mode 100644
index 0000000..78575b8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.token.delegation;
+
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramCredentialUtils;
+import org.apache.kafka.common.security.scram.ScramMechanism;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DelegationTokenCache {
+
+    private CredentialCache credentialCache = new CredentialCache();
+    //Cache to hold all the tokens
+    private Map<String, TokenInformation> tokenCache = new ConcurrentHashMap<>();
+    //Cache to hold hmac->tokenId mapping. This is required for renew, expire requests
+    private Map<String, String> hmacIDCache = new ConcurrentHashMap<>();
+
+    public DelegationTokenCache(Collection<String> scramMechanisms) {
+        //Create caches for scramMechanisms
+        ScramCredentialUtils.createCache(credentialCache, scramMechanisms);
+    }
+
+    public ScramCredential credential(String mechanism, String tokenId) {
+        CredentialCache.Cache<ScramCredential> cache = credentialCache.cache(mechanism, ScramCredential.class);
+        return cache == null ? null : cache.get(tokenId);
+    }
+
+    public String owner(String tokenId) {
+        TokenInformation tokenInfo = tokenCache.get(tokenId);
+        return tokenInfo == null ? null : tokenInfo.owner().getName();
+    }
+
+    public void updateCache(DelegationToken token, Map<String, ScramCredential> scramCredentialMap) {
+        //Update TokenCache
+        String tokenId =  token.tokenInfo().tokenId();
+        addToken(tokenId, token.tokenInfo());
+        String hmac = token.hmacAsBase64String();
+        //Update Scram Credentials
+        updateCredentials(tokenId, scramCredentialMap);
+        //Update hmac-id cache
+        hmacIDCache.put(hmac, tokenId);
+    }
+
+
+    public void removeCache(String tokenId) {
+        removeToken(tokenId);
+        updateCredentials(tokenId, new HashMap<String, ScramCredential>());
+    }
+
+    public TokenInformation tokenForHmac(String base64hmac) {
+        String tokenId = hmacIDCache.get(base64hmac);
+        return tokenId == null ? null : tokenCache.get(tokenId);
+    }
+
+    public TokenInformation addToken(String tokenId, TokenInformation tokenInfo) {
+        return tokenCache.put(tokenId, tokenInfo);
+    }
+
+    public void removeToken(String tokenId) {
+        TokenInformation tokenInfo = tokenCache.remove(tokenId);
+        if (tokenInfo != null) {
+            hmacIDCache.remove(tokenInfo.tokenId());
+        }
+    }
+
+    public Collection<TokenInformation> tokens() {
+        return tokenCache.values();
+    }
+
+    public TokenInformation token(String tokenId) {
+        return tokenCache.get(tokenId);
+    }
+
+    public CredentialCache.Cache<ScramCredential> credentialCache(String mechanism) {
+        return credentialCache.cache(mechanism, ScramCredential.class);
+    }
+
+    private void updateCredentials(String tokenId, Map<String, ScramCredential> scramCredentialMap) {
+        for (String mechanism : ScramMechanism.mechanismNames()) {
+            CredentialCache.Cache<ScramCredential> cache = credentialCache.cache(mechanism, ScramCredential.class);
+            if (cache != null) {
+                ScramCredential credential = scramCredentialMap.get(mechanism);
+                if (credential == null) {
+                    cache.remove(tokenId);
+                } else {
+                    cache.put(tokenId, credential);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
new file mode 100644
index 0000000..1d500d2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.token.delegation;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class TokenInformation {
+
+    private KafkaPrincipal owner;
+    private Collection<KafkaPrincipal> renewers;
+    private long issueTimestamp;
+    private long maxTimestamp;
+    private long expiryTimestamp;
+    private String tokenId;
+
+    public TokenInformation(String tokenId, KafkaPrincipal owner, Collection<KafkaPrincipal> renewers,
+                            long issueTimestamp, long maxTimestamp, long expiryTimestamp) {
+        this.tokenId = tokenId;
+        this.owner = owner;
+        this.renewers = renewers;
+        this.issueTimestamp =  issueTimestamp;
+        this.maxTimestamp =  maxTimestamp;
+        this.expiryTimestamp =  expiryTimestamp;
+    }
+
+    public KafkaPrincipal owner() {
+        return owner;
+    }
+
+    public String ownerAsString() {
+        return owner.toString();
+    }
+
+    public Collection<KafkaPrincipal> renewers() {
+        return renewers;
+    }
+
+    public Collection<String> renewersAsString() {
+        Collection<String> renewerList = new ArrayList<>();
+        for (KafkaPrincipal renewer : renewers) {
+            renewerList.add(renewer.toString());
+        }
+        return renewerList;
+    }
+
+    public long issueTimestamp() {
+        return issueTimestamp;
+    }
+
+    public long expiryTimestamp() {
+        return expiryTimestamp;
+    }
+
+    public void setExpiryTimestamp(long expiryTimestamp) {
+        this.expiryTimestamp = expiryTimestamp;
+    }
+
+    public String tokenId() {
+        return tokenId;
+    }
+
+    public long maxTimestamp() {
+        return maxTimestamp;
+    }
+
+    public boolean ownerOrRenewer(KafkaPrincipal principal) {
+        if (owner.equals(principal) || renewers.contains(principal))
+            return true;
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "TokenInformation{" +
+            "owner=" + owner +
+            ", renewers=" + renewers +
+            ", issueTimestamp=" + issueTimestamp +
+            ", maxTimestamp=" + maxTimestamp +
+            ", expiryTimestamp=" + expiryTimestamp +
+            ", tokenId='" + tokenId + '\'' +
+            '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        TokenInformation that = (TokenInformation) o;
+
+        if (issueTimestamp != that.issueTimestamp) {
+            return false;
+        }
+        if (maxTimestamp != that.maxTimestamp) {
+            return false;
+        }
+        if (owner != null ? !owner.equals(that.owner) : that.owner != null) {
+            return false;
+        }
+        if (renewers != null ? !renewers.equals(that.renewers) : that.renewers != null) {
+            return false;
+        }
+        return tokenId != null ? tokenId.equals(that.tokenId) : that.tokenId == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = owner != null ? owner.hashCode() : 0;
+        result = 31 * result + (renewers != null ? renewers.hashCode() : 0);
+        result = 31 * result + (int) (issueTimestamp ^ (issueTimestamp >>> 32));
+        result = 31 * result + (int) (maxTimestamp ^ (maxTimestamp >>> 32));
+        result = 31 * result + (tokenId != null ? tokenId.hashCode() : 0);
+        return result;
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index e27a31d..ff8929a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -44,6 +44,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+
 /**
  * Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
  * with the configured security protocol.
@@ -63,6 +65,7 @@ public class NioEchoServer extends Thread {
     private final CredentialCache credentialCache;
     private final Metrics metrics;
     private int numSent = 0;
+    private final DelegationTokenCache tokenCache;
 
     public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
             String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache) throws Exception {
@@ -75,10 +78,11 @@ public class NioEchoServer extends Thread {
         this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
         this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
         this.credentialCache = credentialCache;
+        this.tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames());
         if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL)
             ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
         if (channelBuilder == null)
-            channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
+            channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache, tokenCache);
         this.metrics = new Metrics();
         this.selector = new Selector(5000, metrics, new MockTime(), "MetricGroup", channelBuilder, new LogContext());
         acceptorThread = new AcceptorThread();
@@ -92,6 +96,10 @@ public class NioEchoServer extends Thread {
         return credentialCache;
     }
 
+    public DelegationTokenCache tokenCache() {
+        return tokenCache;
+    }
+
     @SuppressWarnings("deprecation")
     public double metricValue(String name) {
         for (Map.Entry<MetricName, KafkaMetric> entry : metrics.metrics().entrySet()) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 750fd01..a64cb45 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -72,7 +72,7 @@ public class SaslChannelBuilderTest {
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
         JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
         return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"),
-                "PLAIN", true, null);
+                "PLAIN", true, null, null);
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
index c4b3fc4..0ded0de 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -26,7 +26,8 @@ public class ProtoUtilsTest {
     public void testDelayedAllocationSchemaDetection() throws Exception {
         //verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.
         for (ApiKeys key : ApiKeys.values()) {
-            if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP || key == ApiKeys.SASL_AUTHENTICATE) {
+            if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP || key == ApiKeys.SASL_AUTHENTICATE
+                || key == ApiKeys.EXPIRE_DELEGATION_TOKEN || key == ApiKeys.RENEW_DELEGATION_TOKEN) {
                 assertTrue(key + " should require delayed allocation", key.requiresDelayedAllocation);
             } else {
                 assertFalse(key + " should not require delayed allocation", key.requiresDelayedAllocation);
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index edd1314..2771f2f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -49,6 +49,10 @@ import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
@@ -62,6 +66,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -245,6 +250,18 @@ public class RequestResponseTest {
         checkRequest(createCreatePartitionsRequestWithAssignments());
         checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException());
         checkResponse(createCreatePartitionsResponse(), 0);
+        checkRequest(createCreateTokenRequest());
+        checkErrorResponse(createCreateTokenRequest(), new UnknownServerException());
+        checkResponse(createCreateTokenResponse(), 0);
+        checkRequest(createDescribeTokenRequest());
+        checkErrorResponse(createDescribeTokenRequest(), new UnknownServerException());
+        checkResponse(createDescribeTokenResponse(), 0);
+        checkRequest(createExpireTokenRequest());
+        checkErrorResponse(createExpireTokenRequest(), new UnknownServerException());
+        checkResponse(createExpireTokenResponse(), 0);
+        checkRequest(createRenewTokenRequest());
+        checkErrorResponse(createRenewTokenRequest(), new UnknownServerException());
+        checkResponse(createRenewTokenResponse(), 0);
     }
 
     @Test
@@ -1082,4 +1099,57 @@ public class RequestResponseTest {
         return new CreatePartitionsResponse(42, results);
     }
 
+    private CreateDelegationTokenRequest createCreateTokenRequest() {
+        List<KafkaPrincipal> renewers = new ArrayList<>();
+        renewers.add(SecurityUtils.parseKafkaPrincipal("User:user1"));
+        renewers.add(SecurityUtils.parseKafkaPrincipal("User:user2"));
+        return new CreateDelegationTokenRequest.Builder(renewers, System.currentTimeMillis()).build();
+    }
+
+    private CreateDelegationTokenResponse createCreateTokenResponse() {
+        return new CreateDelegationTokenResponse(20, Errors.NONE, SecurityUtils.parseKafkaPrincipal("User:user1"), System.currentTimeMillis(),
+            System.currentTimeMillis(), System.currentTimeMillis(), "token1", ByteBuffer.wrap("test".getBytes()));
+    }
+
+    private RenewDelegationTokenRequest createRenewTokenRequest() {
+        return new RenewDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+    }
+
+    private RenewDelegationTokenResponse createRenewTokenResponse() {
+        return new RenewDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis());
+    }
+
+    private ExpireDelegationTokenRequest createExpireTokenRequest() {
+        return new ExpireDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+    }
+
+    private ExpireDelegationTokenResponse createExpireTokenResponse() {
+        return new ExpireDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis());
+    }
+
+    private DescribeDelegationTokenRequest createDescribeTokenRequest() {
+        List<KafkaPrincipal> owners = new ArrayList<>();
+        owners.add(SecurityUtils.parseKafkaPrincipal("User:user1"));
+        owners.add(SecurityUtils.parseKafkaPrincipal("User:user2"));
+        return new DescribeDelegationTokenRequest.Builder(owners).build();
+    }
+
+    private DescribeDelegationTokenResponse createDescribeTokenResponse() {
+        List<KafkaPrincipal> renewers = new ArrayList<>();
+        renewers.add(SecurityUtils.parseKafkaPrincipal("User:user1"));
+        renewers.add(SecurityUtils.parseKafkaPrincipal("User:user2"));
+
+        List<DelegationToken> tokenList = new LinkedList<>();
+
+        TokenInformation tokenInfo1 = new TokenInformation("1", SecurityUtils.parseKafkaPrincipal("User:owner"), renewers,
+            System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis());
+
+        TokenInformation tokenInfo2 = new TokenInformation("2", SecurityUtils.parseKafkaPrincipal("User:owner1"), renewers,
+            System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis());
+
+        tokenList.add(new DelegationToken(tokenInfo1, "test".getBytes()));
+        tokenList.add(new DelegationToken(tokenInfo2, "test".getBytes()));
+
+        return new DescribeDelegationTokenResponse(20, Errors.NONE, tokenList);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
index d5f13bc..8e07e73 100644
--- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
@@ -41,7 +41,8 @@ public class ResourceTypeTest {
         new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false),
         new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false),
         new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false),
-        new AclResourceTypeTestInfo(ResourceType.TRANSACTIONAL_ID, 5, "transactional_id", false)
+        new AclResourceTypeTestInfo(ResourceType.TRANSACTIONAL_ID, 5, "transactional_id", false),
+        new AclResourceTypeTestInfo(ResourceType.DELEGATION_TOKEN, 6, "delegation_token", false)
     };
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index a30c09f..787f5a7 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -118,6 +119,7 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
 
         EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
         EasyMock.expect(server.getAuthorizationID()).andReturn("foo");
+        EasyMock.expect(server.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG)).andReturn(false);
 
         replayAll();
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c442672..bd7fee3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -51,12 +51,15 @@ import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.ScramFormatter;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -375,6 +378,39 @@ public class SaslAuthenticatorTest {
         createAndCheckClientConnection(securityProtocol, "0");
     }
 
+
+    @Test
+    public void testTokenAuthenticationOverSaslScram() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
+
+        //create jaas config for token auth
+        Map<String, Object> options = new HashMap<>();
+        String tokenId = "token1";
+        String tokenHmac = "abcdefghijkl";
+        options.put("username", tokenId); //tokenId
+        options.put("password", tokenHmac); //token hmac
+        options.put(ScramLoginModule.TOKEN_AUTH_CONFIG, "true"); //enable token authentication
+        jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+
+        server = createEchoServer(securityProtocol);
+
+        //Check invalid tokenId/tokenInfo in tokenCache
+        createAndCheckClientConnectionFailure(securityProtocol, "0");
+
+        //Check valid token Info and invalid credentials
+        KafkaPrincipal owner = SecurityUtils.parseKafkaPrincipal("User:Owner");
+        KafkaPrincipal renewer = SecurityUtils.parseKafkaPrincipal("User:Renewer1");
+        TokenInformation tokenInfo = new TokenInformation(tokenId, owner, Collections.singleton(renewer),
+            System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis());
+        server.tokenCache().addToken(tokenId, tokenInfo);
+        createAndCheckClientConnectionFailure(securityProtocol, "0");
+
+        //Check with valid token Info and credentials
+        updateTokenCredentialCache(tokenId, tokenHmac);
+        createAndCheckClientConnection(securityProtocol, "0");
+    }
+
     /**
      * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
      * prior to SASL handshake flow and that subsequent authentication succeeds
@@ -956,13 +992,13 @@ public class SaslAuthenticatorTest {
         if (isScram)
             ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
         SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext,
-                securityProtocol, listenerName, saslMechanism, true, credentialCache) {
+                securityProtocol, listenerName, saslMechanism, true, credentialCache, null) {
 
             @Override
             protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
                             TransportLayer transportLayer, Subject subject) throws IOException {
                 return new SaslServerAuthenticator(configs, id, jaasContext, subject, null,
-                                credentialCache, listenerName, securityProtocol, transportLayer) {
+                                credentialCache, listenerName, securityProtocol, transportLayer, null) {
 
                     @Override
                     protected ApiVersionsResponse apiVersionsResponse() {
@@ -998,7 +1034,7 @@ public class SaslAuthenticatorTest {
         final Map<String, ?> configs = Collections.emptyMap();
         final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs);
         SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext,
-                securityProtocol, listenerName, saslMechanism, true, null) {
+                securityProtocol, listenerName, saslMechanism, true, null, null) {
 
             @Override
             protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs, String id,
@@ -1207,4 +1243,15 @@ public class SaslAuthenticatorTest {
     private ApiVersionsRequest createApiVersionsRequestV0() {
         return new ApiVersionsRequest.Builder((short) 0).build();
     }
+
+    private void updateTokenCredentialCache(String username, String password) throws NoSuchAlgorithmException {
+        for (String mechanism : (List<String>) saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) {
+            ScramMechanism scramMechanism = ScramMechanism.forMechanismName(mechanism);
+            if (scramMechanism != null) {
+                ScramFormatter formatter = new ScramFormatter(scramMechanism);
+                ScramCredential credential = formatter.generateCredential(password, 4096);
+                server.tokenCache().credentialCache(scramMechanism.mechanismName()).put(username, credential);
+            }
+        }
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 022a099..51ea58e 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.security.scram.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -110,7 +112,7 @@ public class SaslServerAuthenticatorTest {
         JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
         Subject subject = new Subject();
         return new SaslServerAuthenticator(configs, "node", jaasContext, subject, null, new CredentialCache(),
-                new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer);
+                new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames()));
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
index 89e6260..58be6e1 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
@@ -70,7 +70,7 @@ public class ScramMessagesTest {
     @Test
     public void validClientFirstMessage() throws SaslException {
         String nonce = formatter.secureRandomString();
-        ClientFirstMessage m = new ClientFirstMessage("someuser", nonce);
+        ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, "");
         checkClientFirstMessage(m, "someuser", nonce, "");
 
         // Default format used by Kafka client: only user and nonce are specified
@@ -107,6 +107,11 @@ public class ScramMessagesTest {
             str = String.format("n,,n=testuser,r=%s,%s", nonce, extension);
             checkClientFirstMessage(createScramMessage(ClientFirstMessage.class, str), "testuser", nonce, "");
         }
+
+        //optional tokenauth specified as extensions
+        str = String.format("n,,n=testuser,r=%s,%s", nonce, "tokenauth=true");
+        m = createScramMessage(ClientFirstMessage.class, str);
+        assertEquals("tokenauth=true", m.extensions());
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
index fd7f988..82ad914 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.security.scram;
 
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,7 +44,7 @@ public class ScramSaslServerTest {
         CredentialCache.Cache<ScramCredential> credentialCache = new CredentialCache().createCache(mechanism.mechanismName(), ScramCredential.class);
         credentialCache.put(USER_A, formatter.generateCredential("passwordA", 4096));
         credentialCache.put(USER_B, formatter.generateCredential("passwordB", 4096));
-        ScramServerCallbackHandler callbackHandler = new ScramServerCallbackHandler(credentialCache);
+        ScramServerCallbackHandler callbackHandler = new ScramServerCallbackHandler(credentialCache, new DelegationTokenCache(ScramMechanism.mechanismNames()));
         saslServer = new ScramSaslServer(mechanism, new HashMap<String, Object>(), callbackHandler);
     }
 
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 2732f6c..fa6333c 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -34,7 +34,8 @@ object AclCommand extends Logging {
     Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
     Group -> Set(Read, Describe, All),
     Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
-    TransactionalId -> Set(Describe, Write, All)
+    TransactionalId -> Set(Describe, Write, All),
+    DelegationToken -> Set(Describe, All)
   )
 
   def main(args: Array[String]) {
@@ -246,8 +247,11 @@ object AclCommand extends Logging {
       opts.options.valuesOf(opts.transactionalIdOpt).asScala.foreach(transactionalId =>
         resources += new Resource(TransactionalId, transactionalId))
 
+    if (opts.options.has(opts.delegationTokenOpt))
+      opts.options.valuesOf(opts.delegationTokenOpt).asScala.foreach(token => resources += new Resource(DelegationToken, token.trim))
+
     if (resources.isEmpty && dieIfNoResourceFound)
-      CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>")
+      CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>")
 
     resources
   }
@@ -304,6 +308,12 @@ object AclCommand extends Logging {
       "used in combination with the --producer option. Note that idempotence is enabled automatically if " +
       "the producer is authorized to a particular transactional-id.")
 
+    val delegationTokenOpt = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
+      "A value of * indicates ACL should apply to all tokens.")
+      .withRequiredArg
+      .describedAs("delegation-token")
+      .ofType(classOf[String])
+
     val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
     val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
     val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 1051993..4a37330 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -34,7 +34,10 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.{LogContext, KafkaThread, Time, Utils}
+import org.apache.kafka.common.utils.LogContext
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
+import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
@@ -333,6 +336,34 @@ class AdminClient(val time: Time,
     ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
   }
 
+  def createToken(renewers: List[KafkaPrincipal], maxTimePeriodMs: Long = -1): (Errors, DelegationToken) = {
+    val responseBody = sendAnyNode(ApiKeys.CREATE_DELEGATION_TOKEN, new CreateDelegationTokenRequest.Builder(renewers.asJava, maxTimePeriodMs))
+    val response = responseBody.asInstanceOf[CreateDelegationTokenResponse]
+    val tokenInfo = new TokenInformation(response.tokenId, response.owner, renewers.asJava,
+      response.issueTimestamp, response.maxTimestamp, response.expiryTimestamp)
+    (response.error, new DelegationToken(tokenInfo, response.hmacBytes))
+  }
+
+  def renewToken(hmac: ByteBuffer, renewTimePeriod: Long = -1): (Errors, Long) = {
+    val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new RenewDelegationTokenRequest.Builder(hmac, renewTimePeriod))
+    val response = responseBody.asInstanceOf[RenewDelegationTokenResponse]
+    (response.error, response.expiryTimestamp)
+  }
+
+  def expireToken(hmac: ByteBuffer, expiryTimeStamp: Long = -1): (Errors, Long) = {
+    val responseBody = sendAnyNode(ApiKeys.EXPIRE_DELEGATION_TOKEN, new ExpireDelegationTokenRequest.Builder(hmac, expiryTimeStamp))
+    val response = responseBody.asInstanceOf[ExpireDelegationTokenResponse]
+    (response.error, response.expiryTimestamp)
+  }
+
+  def describeToken(owners: List[KafkaPrincipal]): (Errors, List[DelegationToken]) = {
+    val ownersList = if (owners == null) null else owners.asJava
+    val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new DescribeDelegationTokenRequest.Builder(ownersList))
+    val response = responseBody.asInstanceOf[DescribeDelegationTokenResponse]
+    (response.error, response.tokens().asScala.toList)
+  }
+
+
   def close() {
     running = false
     try {
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
new file mode 100644
index 0000000..6c5d1ce
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.nio.ByteBuffer
+
+import joptsimple._
+import kafka.utils.{CommandLineUtils, Exit, Logging}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils}
+
+import scala.collection.JavaConverters._
+import scala.collection.Set
+
+/**
+ * A command to manage delegation token.
+ */
+object DelegationTokenCommand extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val opts = new DelegationTokenCommandOptions(args)
+
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(opts.parser, "Tool to create, renew, expire, or describe delegation tokens.")
+
+    // should have exactly one action
+    val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt, opts.describeOpt).count(opts.options.has _)
+    if(actions != 1)
+      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe")
+
+    opts.checkArgs()
+
+    val adminClient = createAdminClient(opts)
+
+    var exitCode = 0
+    try {
+      if(opts.options.has(opts.createOpt))
+        createToken(adminClient, opts)
+      else if(opts.options.has(opts.renewOpt))
+        renewToken(adminClient, opts)
+      else if(opts.options.has(opts.expiryOpt))
+        expireToken(adminClient, opts)
+      else if(opts.options.has(opts.describeOpt))
+        describeToken(adminClient, opts)
+    } catch {
+      case e: Throwable =>
+        println("Error while executing delegation token command : " + e.getMessage)
+        error(Utils.stackTrace(e))
+        exitCode = 1
+    } finally {
+      adminClient.close()
+      Exit.exit(exitCode)
+    }
+  }
+
+  def createToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+    val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt)
+    val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
+
+    println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs)
+    val response = adminClient.createToken(renewerPrincipals, maxLifeTimeMs)
+    response  match {
+        case (Errors.NONE, token) => println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
+        case (e, _) =>  throw new AdminOperationException(e.message)
+    }
+  }
+
+  def printToken(tokens: List[DelegationToken]): Unit = {
+    print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
+    for (token <- tokens) {
+      val tokenInfo = token.tokenInfo
+      print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+        tokenInfo.tokenId,
+        token.hmacAsBase64String,
+        tokenInfo.owner,
+        tokenInfo.renewersAsString,
+        tokenInfo.issueTimestamp,
+        tokenInfo.expiryTimestamp,
+        tokenInfo.maxTimestamp))
+      println()
+    }
+  }
+
+  private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): List[KafkaPrincipal] = {
+    if (opts.options.has(principalOptionSpec))
+      opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList
+    else
+      List.empty[KafkaPrincipal]
+  }
+
+  def renewToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+    val hmac = opts.options.valueOf(opts.hmacOpt)
+    val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
+    println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
+    val response = adminClient.renewToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), renewTimePeriodMs)
+    response match {
+      case (Errors.NONE, expiryTimeStamp) => println("Completed renew operation. New expiry timestamp : %s".format(expiryTimeStamp))
+      case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
+    }
+  }
+
+  def expireToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+    val hmac = opts.options.valueOf(opts.hmacOpt)
+    val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
+    println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
+    val response = adminClient.expireToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), expiryTimePeriodMs)
+    response match {
+      case (Errors.NONE, expiryTimeStamp) => println("Completed expire operation. New expiry timestamp : %s".format(expiryTimeStamp))
+      case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
+    }
+  }
+
+  def describeToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+    val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
+    println("Calling describe token operation for owners :" + ownerPrincipals)
+    val response = adminClient.describeToken(ownerPrincipals)
+    response  match {
+      case (Errors.NONE, tokens) => println("Total Number of tokens : %s".format(tokens.size)); printToken(tokens)
+      case (e, tokens) => throw new AdminOperationException(e.message)
+    }
+  }
+
+  private def createAdminClient(opts: DelegationTokenCommandOptions): AdminClient = {
+    val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+    AdminClient.create(props)
+  }
+
+  class DelegationTokenCommandOptions(args: Array[String]) {
+    val BootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping."
+    val CommandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+      " operations are allowed in secure mode only. This config file is used to pass security related configs."
+
+    val parser = new OptionParser(false)
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
+                                   .withRequiredArg
+                                   .ofType(classOf[String])
+    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+      .withRequiredArg
+      .ofType(classOf[String])
+
+    val createOpt = parser.accepts("create", "Create a new delegation token.")
+    val renewOpt = parser.accepts("renew",  "Renew delegation token.")
+    val expiryOpt = parser.accepts("expire", "Expire delegation token.")
+    val describeOpt = parser.accepts("describe", "describe delegation tokens.")
+
+    val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
+      .withOptionalArg()
+      .ofType(classOf[String])
+
+    val renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a kafka principal. It is should be in principalType:name format.")
+      .withOptionalArg()
+      .ofType(classOf[String])
+
+    val maxLifeTimeOpt = parser.accepts("max-life-time-period", "Max life period for the token in milliseconds. If the value is -1," +
+      " then token max life time will default to a server side config value (delegation.token.max.lifetime.ms).")
+      .withOptionalArg()
+      .ofType(classOf[Long])
+
+    val renewTimePeriodOpt = parser.accepts("renew-time-period", "Renew time period in milliseconds. If the value is -1, then the" +
+      " renew time period will default to a server side config value (delegation.token.expiry.time.ms).")
+      .withOptionalArg()
+      .ofType(classOf[Long])
+
+    val expiryTimePeriodOpt = parser.accepts("expiry-time-period", "Expiry time period in milliseconds. If the value is -1, then the" +
+      " token will get invalidated immediately." )
+      .withOptionalArg()
+      .ofType(classOf[Long])
+
+    val hmacOpt = parser.accepts("hmac", "HMAC of the delegation token")
+      .withOptionalArg
+      .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    def checkArgs() {
+      // check required args
+      CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt)
+
+      if (options.has(createOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt)
+
+      if (options.has(renewOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, renewTimePeriodOpt)
+
+      if (options.has(expiryOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, expiryTimePeriodOpt)
+
+      // check invalid args
+      CommandLineUtils.checkInvalidArgs(parser, options, createOpt, Set(hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt, ownerPrincipalsOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, Set(renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, Set(renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt))
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6b5c34e..a8e1249 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -55,7 +55,8 @@ object KafkaController extends Logging {
 }
 
 class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo,
-                      threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+                      tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+
   this.logIdent = s"[Controller id=${config.brokerId}] "
 
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
@@ -90,6 +91,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   @volatile private var globalTopicCount = 0
   @volatile private var globalPartitionCount = 0
 
+  /* single-thread scheduler to clean expired tokens */
+  private val tokenCleanScheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "delegation-token-cleaner")
+
   newGauge(
     "ActiveControllerCount",
     new Gauge[Int] {
@@ -243,6 +247,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     if (config.autoLeaderRebalanceEnable) {
       scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
     }
+
+    if (config.tokenAuthEnabled) {
+      info("starting the token expiry check scheduler")
+      tokenCleanScheduler.startup()
+      tokenCleanScheduler.schedule(name = "delete-expired-tokens",
+        fun = tokenManager.expireTokens,
+        period = config.delegationTokenExpiryCheckIntervalMs,
+        unit = TimeUnit.MILLISECONDS)
+    }
   }
 
   private def scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit = {
@@ -272,6 +285,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     globalTopicCount = 0
     globalPartitionCount = 0
 
+    // stop token expiry check scheduler
+    if (tokenCleanScheduler.isStarted)
+      tokenCleanScheduler.shutdown()
+
     // de-register partition ISR listener for on-going partition reassignment task
     unregisterPartitionReassignmentIsrChangeHandlers()
     // shutdown partition state machine
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 200bfe2..d5e49a5 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -437,7 +437,7 @@ private[kafka] class Processor(val id: Int,
   )
 
   private val selector = createSelector(
-      ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
+      ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache, credentialProvider.tokenCache))
   // Visible to override for testing
   protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = new KSelector(
     maxRequestSize,
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 5d9d7ba..120e8f9 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -23,8 +23,9 @@ import org.apache.kafka.common.security.authenticator.CredentialCache
 import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
 import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.config.ConfigDef._
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
 
-class CredentialProvider(saslEnabledMechanisms: List[String]) {
+class CredentialProvider(saslEnabledMechanisms: List[String], val tokenCache: DelegationTokenCache) {
 
   val credentialCache = new CredentialCache
   ScramCredentialUtils.createCache(credentialCache, saslEnabledMechanisms)
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index b046ddd..66da610 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -49,6 +49,12 @@ case object TransactionalId extends ResourceType {
   val toJava = JResourceType.TRANSACTIONAL_ID
 }
 
+case object DelegationToken extends ResourceType {
+  val name = "DelegationToken"
+  val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
+  val toJava = JResourceType.DELEGATION_TOKEN;
+}
+
 object ResourceType {
 
   def fromString(resourceType: String): ResourceType = {
@@ -56,7 +62,7 @@ object ResourceType {
     rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
   }
 
-  def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId)
+  def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId, DelegationToken)
 
   def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", ""))
 }
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
new file mode 100644
index 0000000..008dc32
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -0,0 +1,515 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.security.InvalidKeyException
+import javax.crypto.spec.SecretKeySpec
+import javax.crypto.{Mac, SecretKey}
+
+import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{CoreUtils, Json, Logging}
+import kafka.zk.{DelegationTokenChangeNotificationSequenceZNode, DelegationTokenChangeNotificationZNode, DelegationTokensZNode, KafkaZkClient}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.scram.{ScramCredential, ScramFormatter, ScramMechanism}
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+object DelegationTokenManager {
+  val DefaultHmacAlgorithm = "HmacSHA512"
+  val OwnerKey ="owner"
+  val RenewersKey = "renewers"
+  val IssueTimestampKey = "issueTimestamp"
+  val MaxTimestampKey = "maxTimestamp"
+  val ExpiryTimestampKey = "expiryTimestamp"
+  val TokenIdKey = "tokenId"
+  val VersionKey = "version"
+  val CurrentVersion = 1
+  val ErrorTimestamp = -1
+
+  /**
+   *
+   * @param tokenId
+   * @param secretKey
+   * @return
+   */
+  def createHmac(tokenId: String, secretKey: String) : Array[Byte] = {
+    createHmac(tokenId, createSecretKey(secretKey.getBytes(StandardCharsets.UTF_8)))
+  }
+
+  /**
+   * Convert the byte[] to a secret key
+   * @param keybytes the byte[] to create the secret key from
+   * @return the secret key
+   */
+  def createSecretKey(keybytes: Array[Byte]) : SecretKey = {
+    new SecretKeySpec(keybytes, DefaultHmacAlgorithm)
+  }
+
+  /**
+   *
+   *
+   * @param tokenId
+   * @param secretKey
+   * @return
+   */
+  def createBase64HMAC(tokenId: String, secretKey: SecretKey) : String = {
+    val hmac = createHmac(tokenId, secretKey)
+    Base64.encoder.encodeToString(hmac)
+  }
+
+  /**
+   * Compute HMAC of the identifier using the secret key
+   * @param tokenId the bytes of the identifier
+   * @param secretKey  the secret key
+   * @return  String of the generated hmac
+   */
+  def createHmac(tokenId: String, secretKey: SecretKey) : Array[Byte] = {
+    val mac =  Mac.getInstance(DefaultHmacAlgorithm)
+    try
+      mac.init(secretKey)
+    catch {
+      case ike: InvalidKeyException => throw new IllegalArgumentException("Invalid key to HMAC computation", ike);
+    }
+    mac.doFinal(tokenId.getBytes(StandardCharsets.UTF_8))
+  }
+
+  def toJsonCompatibleMap(token: DelegationToken):  Map[String, Any] = {
+    val tokenInfo = token.tokenInfo
+    val tokenInfoMap = mutable.Map[String, Any]()
+    tokenInfoMap(VersionKey) = CurrentVersion
+    tokenInfoMap(OwnerKey) = Sanitizer.sanitize(tokenInfo.ownerAsString)
+    tokenInfoMap(RenewersKey) = tokenInfo.renewersAsString.asScala.map(e => Sanitizer.sanitize(e)).asJava
+    tokenInfoMap(IssueTimestampKey) = tokenInfo.issueTimestamp
+    tokenInfoMap(MaxTimestampKey) = tokenInfo.maxTimestamp
+    tokenInfoMap(ExpiryTimestampKey) = tokenInfo.expiryTimestamp
+    tokenInfoMap(TokenIdKey) = tokenInfo.tokenId()
+    tokenInfoMap.toMap
+  }
+
+  def fromBytes(bytes: Array[Byte]): Option[TokenInformation] = {
+    if (bytes == null || bytes.isEmpty)
+      return None
+
+    Json.parseBytes(bytes) match {
+      case Some(js) =>
+        val mainJs = js.asJsonObject
+        require(mainJs(VersionKey).to[Int] == CurrentVersion)
+        val owner = SecurityUtils.parseKafkaPrincipal(Sanitizer.desanitize(mainJs(OwnerKey).to[String]))
+        val renewerStr = mainJs(RenewersKey).to[Seq[String]]
+        val renewers = renewerStr.map(Sanitizer.desanitize(_)).map(SecurityUtils.parseKafkaPrincipal(_))
+        val issueTimestamp = mainJs(IssueTimestampKey).to[Long]
+        val expiryTimestamp = mainJs(ExpiryTimestampKey).to[Long]
+        val maxTimestamp = mainJs(MaxTimestampKey).to[Long]
+        val tokenId = mainJs(TokenIdKey).to[String]
+
+        val tokenInfo = new TokenInformation(tokenId, owner, renewers.asJava,
+          issueTimestamp, maxTimestamp, expiryTimestamp)
+
+        Some(tokenInfo)
+      case None =>
+        None
+    }
+  }
+
+  def filterToken(requestedPrincipal: KafkaPrincipal, owners : Option[List[KafkaPrincipal]], token: TokenInformation, authorizeToken: String => Boolean) : Boolean = {
+
+    val allow =
+    //exclude tokens which are not requested
+      if (!owners.isEmpty && !owners.get.exists(owner => token.ownerOrRenewer(owner))) {
+        false
+        //Owners and the renewers can describe their own tokens
+      } else if (token.ownerOrRenewer(requestedPrincipal)) {
+        true
+        // Check permission for non-owned tokens
+      } else if ((authorizeToken(token.tokenId))) {
+        true
+      }
+      else {
+        false
+      }
+
+    allow
+  }
+}
+
+class DelegationTokenManager(val config: KafkaConfig,
+                             val tokenCache: DelegationTokenCache,
+                             val time: Time,
+                             val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
+  this.logIdent = "[Token Manager on Broker " + config.brokerId + "]: "
+
+  import DelegationTokenManager._
+
+  type CreateResponseCallback = CreateTokenResult => Unit
+  type RenewResponseCallback = (Errors, Long) => Unit
+  type ExpireResponseCallback = (Errors, Long) => Unit
+  type DescribeResponseCallback = (Errors, List[DelegationToken]) => Unit
+
+  val secretKey = {
+    val keyBytes =  if (config.tokenAuthEnabled) config.delegationTokenMasterKey.value.getBytes(StandardCharsets.UTF_8) else null
+    if (keyBytes == null || keyBytes.length == 0) null
+    else
+      createSecretKey(keyBytes)
+  }
+
+  val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs
+  val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
+  val tokenRemoverScanInterval: Long = config.delegationTokenExpiryCheckIntervalMs
+  private val lock = new Object()
+  private var tokenChangeListener: ZkNodeChangeNotificationListener = null
+
+  def startup() = {
+    if (config.tokenAuthEnabled) {
+      zkClient.createDelegationTokenPaths
+      loadCache
+      tokenChangeListener = new ZkNodeChangeNotificationListener(zkClient, DelegationTokenChangeNotificationZNode.path, DelegationTokenChangeNotificationSequenceZNode.SequenceNumberPrefix, TokenChangedNotificationHandler)
+      tokenChangeListener.init
+    }
+  }
+
+  def shutdown() = {
+    if (config.tokenAuthEnabled) {
+      if (tokenChangeListener != null) tokenChangeListener.close()
+    }
+  }
+
+  private def loadCache() {
+    lock.synchronized {
+      val tokens = zkClient.getChildren(DelegationTokensZNode.path)
+      info(s"Loading the token cache. Total token count : " + tokens.size)
+      for (tokenId <- tokens) {
+        try {
+          getTokenFromZk(tokenId) match {
+            case Some(token) => updateCache(token)
+            case None =>
+          }
+        } catch {
+          case ex: Throwable => error(s"Error while getting Token for tokenId :$tokenId", ex)
+        }
+      }
+    }
+  }
+
+  private def getTokenFromZk(tokenId: String): Option[DelegationToken] = {
+    zkClient.getDelegationTokenInfo(tokenId) match {
+      case Some(tokenInformation) => {
+        val hmac = createHmac(tokenId, secretKey)
+        Some(new DelegationToken(tokenInformation, hmac))
+      }
+      case None =>
+        None
+    }
+  }
+
+  /**
+   *
+   * @param token
+   */
+  private def updateCache(token: DelegationToken): Unit = {
+    val hmacString = token.hmacAsBase64String
+    val scramCredentialMap =  prepareScramCredentials(hmacString)
+    tokenCache.updateCache(token, scramCredentialMap.asJava)
+  }
+  /**
+   * @param hmacString
+   */
+  private def prepareScramCredentials(hmacString: String) : Map[String, ScramCredential] = {
+    val scramCredentialMap = mutable.Map[String, ScramCredential]()
+
+    def scramCredential(mechanism: ScramMechanism): ScramCredential = {
+      new ScramFormatter(mechanism).generateCredential(hmacString, mechanism.minIterations)
+    }
+
+    for (mechanism <- ScramMechanism.values)
+      scramCredentialMap(mechanism.mechanismName) = scramCredential(mechanism)
+
+    scramCredentialMap.toMap
+  }
+
+  /**
+   *
+   * @param owner
+   * @param renewers
+   * @param maxLifeTimeMs
+   * @param responseCallback
+   */
+  def createToken(owner: KafkaPrincipal,
+                  renewers: List[KafkaPrincipal],
+                  maxLifeTimeMs: Long,
+                  responseCallback: CreateResponseCallback) {
+
+    if (!config.tokenAuthEnabled) {
+      responseCallback(CreateTokenResult(-1, -1, -1, "", Array[Byte](), Errors.DELEGATION_TOKEN_AUTH_DISABLED))
+    } else {
+      lock.synchronized {
+        val tokenId = CoreUtils.generateUuidAsBase64
+
+        val issueTimeStamp = time.milliseconds
+        val maxLifeTime = if (maxLifeTimeMs <= 0) tokenMaxLifetime else Math.min(maxLifeTimeMs, tokenMaxLifetime)
+        val maxLifeTimeStamp = issueTimeStamp + maxLifeTime
+        val expiryTimeStamp = Math.min(maxLifeTimeStamp, issueTimeStamp + defaultTokenRenewTime)
+
+        val tokenInfo = new TokenInformation(tokenId, owner, renewers.asJava, issueTimeStamp, maxLifeTimeStamp, expiryTimeStamp)
+
+        val hmac = createHmac(tokenId, secretKey)
+        val token = new DelegationToken(tokenInfo, hmac)
+        updateToken(token)
+        info(s"Created a delegation token : $tokenId for owner : $owner")
+        responseCallback(CreateTokenResult(issueTimeStamp, expiryTimeStamp, maxLifeTimeStamp, tokenId, hmac, Errors.NONE))
+      }
+    }
+  }
+
+  /**
+   *
+   * @param principal
+   * @param hmac
+   * @param renewLifeTimeMs
+   * @param renewCallback
+   */
+  def renewToken(principal: KafkaPrincipal,
+                 hmac: ByteBuffer,
+                 renewLifeTimeMs: Long,
+                 renewCallback: RenewResponseCallback) {
+
+    if (!config.tokenAuthEnabled) {
+      renewCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, -1)
+    } else {
+      lock.synchronized  {
+        getToken(hmac) match {
+          case Some(token) => {
+            val now = time.milliseconds
+            val tokenInfo =  token.tokenInfo
+
+            if (!allowedToRenew(principal, tokenInfo)) {
+              renewCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
+            } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
+              renewCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
+            } else {
+              val renewLifeTime = if (renewLifeTimeMs < 0) defaultTokenRenewTime else renewLifeTimeMs
+              val renewTimeStamp = now + renewLifeTime
+              val expiryTimeStamp = Math.min(tokenInfo.maxTimestamp, renewTimeStamp)
+              tokenInfo.setExpiryTimestamp(expiryTimeStamp)
+
+              updateToken(token)
+              info(s"Delegation token renewed for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+              renewCallback(Errors.NONE, expiryTimeStamp)
+            }
+          }
+          case None => renewCallback(Errors.DELEGATION_TOKEN_NOT_FOUND, -1)
+        }
+      }
+    }
+  }
+
+  /**
+   * @param token
+   */
+  private def updateToken(token: DelegationToken): Unit = {
+    zkClient.setOrCreateDelegationToken(token)
+    updateCache(token)
+    zkClient.createTokenChangeNotification(token.tokenInfo.tokenId())
+  }
+
+  /**
+   *
+   * @param hmac
+   * @return
+   */
+  private def getToken(hmac: ByteBuffer): Option[DelegationToken] = {
+    try {
+      val byteArray = new Array[Byte](hmac.remaining)
+      hmac.get(byteArray)
+      val base64Pwd = Base64.encoder.encodeToString(byteArray)
+      val tokenInfo = tokenCache.tokenForHmac(base64Pwd)
+      if (tokenInfo == null) None else Some(new DelegationToken(tokenInfo, byteArray))
+    } catch {
+      case e: Exception =>
+        error("Exception while getting token for hmac", e)
+        None
+    }
+  }
+
+  /**
+   *
+   * @param principal
+   * @param tokenInfo
+   * @return
+   */
+  private def allowedToRenew(principal: KafkaPrincipal, tokenInfo: TokenInformation): Boolean = {
+    if (principal.equals(tokenInfo.owner) || tokenInfo.renewers.asScala.toList.contains(principal)) true else false
+  }
+
+  /**
+   *
+   * @param tokenId
+   * @return
+   */
+  def getToken(tokenId: String): Option[DelegationToken] = {
+    val tokenInfo = tokenCache.token(tokenId)
+    if (tokenInfo != null) Some(getToken(tokenInfo)) else None
+  }
+
+  /**
+   *
+   * @param tokenInfo
+   * @return
+   */
+  private def getToken(tokenInfo: TokenInformation): DelegationToken = {
+    val hmac = createHmac(tokenInfo.tokenId, secretKey)
+    new DelegationToken(tokenInfo, hmac)
+  }
+
+  /**
+   *
+   * @param principal
+   * @param hmac
+   * @param expireLifeTimeMs
+   * @param expireResponseCallback
+   */
+  def expireToken(principal: KafkaPrincipal,
+                  hmac: ByteBuffer,
+                  expireLifeTimeMs: Long,
+                  expireResponseCallback: ExpireResponseCallback) {
+
+    if (!config.tokenAuthEnabled) {
+      expireResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, -1)
+    } else {
+      lock.synchronized  {
+        getToken(hmac) match {
+          case Some(token) =>  {
+            val tokenInfo =  token.tokenInfo
+            val now = time.milliseconds
+
+            if (!allowedToRenew(principal, tokenInfo)) {
+              expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
+            } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
+              expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
+            } else if (expireLifeTimeMs < 0) { //expire immediately
+              removeToken(tokenInfo.tokenId)
+              info(s"Token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+              expireResponseCallback(Errors.NONE, now)
+            } else {
+              //set expiry time stamp
+              val expiryTimeStamp = Math.min(tokenInfo.maxTimestamp, now + expireLifeTimeMs)
+              tokenInfo.setExpiryTimestamp(expiryTimeStamp)
+
+              updateToken(token)
+              info(s"Updated expiry time for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+              expireResponseCallback(Errors.NONE, expiryTimeStamp)
+            }
+          }
+          case None => expireResponseCallback(Errors.DELEGATION_TOKEN_NOT_FOUND, -1)
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param tokenId
+   */
+  private def removeToken(tokenId: String): Unit = {
+    zkClient.deleteDelegationToken(tokenId)
+    removeCache(tokenId)
+    zkClient.createTokenChangeNotification(tokenId)
+  }
+
+  /**
+   *
+   * @param tokenId
+   */
+  private def removeCache(tokenId: String): Unit = {
+    tokenCache.removeCache(tokenId)
+  }
+
+  /**
+   *
+   * @return
+   */
+  def expireTokens(): Unit = {
+    lock.synchronized {
+      for (tokenInfo <- getAllTokenInformation) {
+        val now = time.milliseconds
+        if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
+          info(s"Delegation token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+          removeToken(tokenInfo.tokenId)
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @return
+   */
+  def getAllTokenInformation(): List[TokenInformation] = {
+    tokenCache.tokens.asScala.toList
+  }
+
+  def getTokens(filterToken: TokenInformation => Boolean): List[DelegationToken] = {
+    getAllTokenInformation().filter(filterToken).map(token => getToken(token))
+  }
+
+  object TokenChangedNotificationHandler extends NotificationHandler {
+    override def processNotification(tokenIdBytes: Array[Byte]) {
+      lock.synchronized {
+        val tokenId = new String(tokenIdBytes, StandardCharsets.UTF_8)
+        info(s"Processing Token Notification for tokenId : $tokenId")
+        getTokenFromZk(tokenId) match {
+          case Some(token) => updateCache(token)
+          case None => removeCache(tokenId)
+        }
+      }
+    }
+  }
+
+}
+
+case class CreateTokenResult(issueTimestamp: Long,
+                             expiryTimestamp: Long,
+                             maxTimestamp: Long,
+                             tokenId: String,
+                             hmac: Array[Byte],
+                             error: Errors) {
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: CreateTokenResult =>
+        error.equals(that.error) &&
+          tokenId.equals(that.tokenId) &&
+          issueTimestamp.equals(that.issueTimestamp) &&
+          expiryTimestamp.equals(that.expiryTimestamp) &&
+          maxTimestamp.equals(that.maxTimestamp) &&
+          (hmac sameElements that.hmac)
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int = {
+    val fields = Seq(issueTimestamp, expiryTimestamp, maxTimestamp, tokenId, hmac, error)
+    fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 758a305..b145d3c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -56,6 +56,8 @@ import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshake
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import DescribeLogDirsResponse.LogDirInfo
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 
 import scala.collection.{mutable, _}
 import scala.collection.JavaConverters._
@@ -80,7 +82,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val quotas: QuotaManagers,
                 brokerTopicStats: BrokerTopicStats,
                 val clusterId: String,
-                time: Time) extends Logging {
+                time: Time,
+                val tokenManager: DelegationTokenManager) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
   val adminZkClient = new AdminZkClient(zkClient)
@@ -135,6 +138,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
         case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
+        case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
+        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
+        case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
+        case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> handleDescribeTokensRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1985,6 +1992,127 @@ class KafkaApis(val requestChannel: RequestChannel,
     sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(throttleTimeMs, logDirInfos.asJava))
   }
 
+  def handleCreateTokenRequest(request: RequestChannel.Request) {
+    val createTokenRequest = request.body[CreateDelegationTokenRequest]
+
+    // the callback for sending a create token response
+    def sendResponseCallback(createResult: CreateTokenResult) {
+      trace("Sending create token response for correlation id %d to client %s."
+        .format(request.header.correlationId, request.header.clientId))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new CreateDelegationTokenResponse(requestThrottleMs, createResult.error, request.session.principal, createResult.issueTimestamp,
+          createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId, ByteBuffer.wrap(createResult.hmac)))
+    }
+
+    if (!allowTokenRequests(request))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new CreateDelegationTokenResponse(requestThrottleMs, Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, request.session.principal))
+    else {
+      val renewerList = createTokenRequest.renewers().asScala.toList
+
+      if (renewerList.exists(principal =>  principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) {
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new CreateDelegationTokenResponse(requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, request.session.principal))
+      }
+      else {
+        tokenManager.createToken(
+          request.session.principal,
+          createTokenRequest.renewers().asScala.toList,
+          createTokenRequest.maxLifeTime(),
+          sendResponseCallback
+        )
+      }
+    }
+  }
+
+  def handleRenewTokenRequest(request: RequestChannel.Request) {
+    val renewTokenRequest = request.body[RenewDelegationTokenRequest]
+
+    // the callback for sending a renew token response
+    def sendResponseCallback(error: Errors, expiryTimestamp: Long) {
+      trace("Sending renew token response %s for correlation id %d to client %s."
+        .format(request.header.correlationId, request.header.clientId))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new RenewDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp))
+    }
+
+    if (!allowTokenRequests(request))
+      sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, DelegationTokenManager.ErrorTimestamp)
+    else {
+      tokenManager.renewToken(
+        request.session.principal,
+        renewTokenRequest.hmac,
+        renewTokenRequest.renewTimePeriod(),
+        sendResponseCallback
+      )
+    }
+  }
+
+  def handleExpireTokenRequest(request: RequestChannel.Request) {
+    val expireTokenRequest = request.body[ExpireDelegationTokenRequest]
+
+    // the callback for sending a expire token response
+    def sendResponseCallback(error: Errors, expiryTimestamp: Long) {
+      trace("Sending expire token response for correlation id %d to client %s."
+        .format(request.header.correlationId, request.header.clientId))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new ExpireDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp))
+    }
+
+    if (!allowTokenRequests(request))
+      sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, DelegationTokenManager.ErrorTimestamp)
+    else {
+      tokenManager.expireToken(
+        request.session.principal,
+        expireTokenRequest.hmac(),
+        expireTokenRequest.expiryTimePeriod(),
+        sendResponseCallback
+      )
+    }
+  }
+
+  def handleDescribeTokensRequest(request: RequestChannel.Request) {
+    val describeTokenRequest = request.body[DescribeDelegationTokenRequest]
+
+    // the callback for sending a describe token response
+    def sendResponseCallback(error: Errors, tokenDetails: List[DelegationToken]) {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new DescribeDelegationTokenResponse(requestThrottleMs, error, tokenDetails.asJava))
+      trace("Sending describe token response for correlation id %d to client %s."
+        .format(request.header.correlationId, request.header.clientId))
+    }
+
+    if (!allowTokenRequests(request))
+      sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, List.empty)
+    else if (!config.tokenAuthEnabled)
+      sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, List.empty)
+    else {
+      val requestPrincipal = request.session.principal
+
+      if (describeTokenRequest.ownersListEmpty()) {
+        sendResponseCallback(Errors.NONE, List())
+      }
+      else {
+        val owners = if (describeTokenRequest.owners == null) None else Some(describeTokenRequest.owners.asScala.toList)
+        def authorizeToken(tokenId: String) = authorize(request.session, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId))
+        def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken)
+        val tokens =  tokenManager.getTokens(eligible)
+        sendResponseCallback(Errors.NONE, tokens)
+      }
+    }
+  }
+
+  def allowTokenRequests(request: RequestChannel.Request): Boolean = {
+    val protocol = request.context.securityProtocol
+    if (request.session.principal.tokenAuthenticated ||
+      protocol == SecurityProtocol.PLAINTEXT ||
+      // disallow requests from 1-way SSL
+      (protocol == SecurityProtocol.SSL && request.session.principal == KafkaPrincipal.ANONYMOUS))
+      false
+    else
+      true
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e24659c..0f0eb14 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -216,6 +216,11 @@ object Defaults {
   val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
   val SaslKerberosPrincipalToLocalRules = BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
+
+  /** ********* Delegation Token configuration ***********/
+  val DelegationTokenMaxLifeTimeMsDefault = 7 * 24 * 60 * 60 * 1000L
+  val DelegationTokenExpiryTimeMsDefault = 24 * 60 * 60 * 1000L
+  val DelegationTokenExpiryCheckIntervalMsDefault = 1 * 60 * 60 * 1000L
 }
 
 object KafkaConfig {
@@ -413,6 +418,12 @@ object KafkaConfig {
   val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN
   val SaslKerberosPrincipalToLocalRulesProp = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG
 
+  /** ********* Delegation Token Configuration ****************/
+  val DelegationTokenMasterKeyProp = "delegation.token.master.key"
+  val DelegationTokenMaxLifeTimeProp = "delegation.token.max.lifetime.ms"
+  val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
+  val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
+
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectDoc = "Zookeeper host string"
@@ -684,6 +695,14 @@ object KafkaConfig {
   val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
   val SaslKerberosPrincipalToLocalRulesDoc = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
 
+  /** ********* Delegation Token Configuration ****************/
+  val DelegationTokenMasterKeyDoc = "Master/secret key to generate and verify delegation tokens. Same key must be configured across all the brokers. " +
+    " If the key is not set or set to empty string, brokers will disable the delegation token support."
+  val DelegationTokenMaxLifeTimeDoc = "The token has a maximum lifetime beyond which it cannot be renewed anymore. Default value 7 days."
+  val DelegationTokenExpiryTimeMsDoc = "The token validity time in seconds before the token needs to be renewed. Default value 1 day."
+  val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
+
+
   private val configDef = {
     import ConfigDef.Importance._
     import ConfigDef.Range._
@@ -884,6 +903,11 @@ object KafkaConfig {
       .define(SaslKerberosTicketRenewJitterProp, DOUBLE, Defaults.SaslKerberosTicketRenewJitter, MEDIUM, SaslKerberosTicketRenewJitterDoc)
       .define(SaslKerberosMinTimeBeforeReloginProp, LONG, Defaults.SaslKerberosMinTimeBeforeRelogin, MEDIUM, SaslKerberosMinTimeBeforeReloginDoc)
       .define(SaslKerberosPrincipalToLocalRulesProp, LIST, Defaults.SaslKerberosPrincipalToLocalRules, MEDIUM, SaslKerberosPrincipalToLocalRulesDoc)
+      /** ********* Delegation Token Configuration ****************/
+      .define(DelegationTokenMasterKeyProp, PASSWORD, null, MEDIUM, DelegationTokenMasterKeyDoc)
+      .define(DelegationTokenMaxLifeTimeProp, LONG, Defaults.DelegationTokenMaxLifeTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenMaxLifeTimeDoc)
+      .define(DelegationTokenExpiryTimeMsProp, LONG, Defaults.DelegationTokenExpiryTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenExpiryTimeMsDoc)
+      .define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DelegationTokenExpiryCheckIntervalMsDefault, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)
 
   }
 
@@ -1093,6 +1117,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
   val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
 
+  /** ********* DelegationToken Configuration **************/
+  val delegationTokenMasterKey = getPassword(KafkaConfig.DelegationTokenMasterKeyProp)
+  val tokenAuthEnabled = (delegationTokenMasterKey != null && !delegationTokenMasterKey.value.isEmpty)
+  val delegationTokenMaxLifeMs = getLong(KafkaConfig.DelegationTokenMaxLifeTimeProp)
+  val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
+  val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
+
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
   val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index df06c1a..355b741 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,6 +43,8 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
@@ -119,10 +121,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   var replicaManager: ReplicaManager = null
   var adminManager: AdminManager = null
+  var tokenManager: DelegationTokenManager = null
 
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
   var dynamicConfigManager: DynamicConfigManager = null
   var credentialProvider: CredentialProvider = null
+  var tokenCache: DelegationTokenCache = null
 
   var groupCoordinator: GroupCoordinator = null
 
@@ -228,7 +232,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         logManager.startup()
 
         metadataCache = new MetadataCache(config.brokerId)
-        credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
+        tokenCache = new DelegationTokenCache(config.saslEnabledMechanisms)
+        credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, tokenCache)
 
         socketServer = new SocketServer(config, metrics, time, credentialProvider)
         socketServer.startup()
@@ -243,8 +248,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         // Now that the broker id is successfully registered, checkpoint it
         checkpointBrokerId(config.brokerId)
 
+        /* start token manager */
+        tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
+        tokenManager.startup()
+
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, threadNamePrefix)
+        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, tokenManager, threadNamePrefix)
         kafkaController.startup()
 
         adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
@@ -269,7 +278,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
           kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
-          brokerTopicStats, clusterId, time)
+          brokerTopicStats, clusterId, time, tokenManager)
 
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
           config.numIoThreads)
@@ -555,6 +564,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         if (groupCoordinator != null)
           CoreUtils.swallow(groupCoordinator.shutdown(), this)
 
+        if (tokenManager != null)
+          CoreUtils.swallow(tokenManager.shutdown(), this)
+
         if (replicaManager != null)
           CoreUtils.swallow(replicaManager.shutdown(), this)
         if (logManager != null)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ac8b932..098670c 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -207,7 +207,7 @@ class ZkUtils(val zkClient: ZkClient,
               val zkConnection: ZkConnection,
               val isSecure: Boolean) extends Logging {
   import ZkUtils._
-  
+
   // These are persistent ZK paths that should exist on kafka broker startup.
   val persistentZkPaths = ZkData.PersistentZkPaths
 
@@ -723,7 +723,7 @@ class ZkUtils(val zkClient: ZkClient,
     debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
     leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
   }
-  
+
   def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
     val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
     topics.foreach { topic =>
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 8179300..d683a8d 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -31,6 +31,7 @@ import kafka.server.ConfigType
 import kafka.utils.Logging
 import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import org.apache.zookeeper.data.{ACL, Stat}
@@ -1079,6 +1080,78 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   }
 
   /**
+   * Creates the required zk nodes for Delegation Token storage
+   */
+  def createDelegationTokenPaths(): Unit = {
+    createRecursive(DelegationTokenChangeNotificationZNode.path, throwIfPathExists = false)
+    createRecursive(DelegationTokensZNode.path, throwIfPathExists = false)
+  }
+
+  /**
+   * Creates Delegation Token change notification message
+   * @param tokenId token Id
+   */
+  def createTokenChangeNotification(tokenId: String): Unit = {
+    val path = DelegationTokenChangeNotificationSequenceZNode.createPath
+    val createRequest = CreateRequest(path, DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createResponse = retryRequestUntilConnected(createRequest)
+    createResponse.resultException.foreach(e => throw e)
+  }
+
+  /**
+   * Sets or creates token info znode with the given token details depending on whether it already
+   * exists or not.
+   *
+   * @param token the token to set on the token znode
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def setOrCreateDelegationToken(token: DelegationToken): Unit = {
+
+    def set(tokenData: Array[Byte]): SetDataResponse = {
+      val setDataRequest = SetDataRequest(DelegationTokenInfoZNode.path(token.tokenInfo().tokenId()), tokenData, ZkVersion.NoVersion)
+      retryRequestUntilConnected(setDataRequest)
+    }
+
+    def create(tokenData: Array[Byte]): CreateResponse = {
+      val path = DelegationTokenInfoZNode.path(token.tokenInfo().tokenId())
+      val createRequest = CreateRequest(path, tokenData, acls(path), CreateMode.PERSISTENT)
+      retryRequestUntilConnected(createRequest)
+    }
+
+    val tokenInfo = DelegationTokenInfoZNode.encode(token)
+    val setDataResponse = set(tokenInfo)
+    setDataResponse.resultCode match {
+      case Code.NONODE =>
+        val createDataResponse = create(tokenInfo)
+        createDataResponse.maybeThrow
+      case _ => setDataResponse.maybeThrow
+    }
+  }
+
+  /**
+   * Gets the Delegation Token Info
+   * @return optional TokenInfo that is Some if the token znode exists and can be parsed and None otherwise.
+   */
+  def getDelegationTokenInfo(delegationTokenId: String): Option[TokenInformation] = {
+    val getDataRequest = GetDataRequest(DelegationTokenInfoZNode.path(delegationTokenId))
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    getDataResponse.resultCode match {
+      case Code.OK => DelegationTokenInfoZNode.decode(getDataResponse.data)
+      case Code.NONODE => None
+      case _ => throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Deletes the given Delegation token node
+   * @param delegationTokenId
+   * @return delete status
+   */
+  def deleteDelegationToken(delegationTokenId: String): Boolean = {
+    deleteRecursive(DelegationTokenInfoZNode.path(delegationTokenId))
+  }
+
+  /**
    * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data
    * watcher registrations on paths which might not even exist.
    *
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 2c86c2c..99fe591 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -23,13 +23,14 @@ import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.common.KafkaException
 import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
-import kafka.security.auth.{Acl, Resource}
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.server.ConfigType
+import kafka.security.auth.{Acl, Resource}
+import kafka.server.{ConfigType, DelegationTokenManager}
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.ZooDefs
 import org.apache.zookeeper.data.{ACL, Stat}
@@ -490,6 +491,32 @@ object ProducerIdBlockZNode {
   def path = "/latest_producer_id_block"
 }
 
+object DelegationTokenAuthZNode {
+  def path = "/delegation_token"
+}
+
+object DelegationTokenChangeNotificationZNode {
+  def path =  s"${DelegationTokenAuthZNode.path}/token_changes"
+}
+
+object DelegationTokenChangeNotificationSequenceZNode {
+  val SequenceNumberPrefix = "token_change_"
+  def createPath = s"${DelegationTokenChangeNotificationZNode.path}/$SequenceNumberPrefix"
+  def deletePath(sequenceNode: String) = s"${DelegationTokenChangeNotificationZNode.path}/${sequenceNode}"
+  def encode(tokenId : String): Array[Byte] = tokenId.getBytes(UTF_8)
+  def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+}
+
+object DelegationTokensZNode {
+  def path = s"${DelegationTokenAuthZNode.path}/tokens"
+}
+
+object DelegationTokenInfoZNode {
+  def path(tokenId: String) =  s"${DelegationTokensZNode.path}/$tokenId"
+  def encode(token: DelegationToken): Array[Byte] =  Json.encodeAsBytes(DelegationTokenManager.toJsonCompatibleMap(token).asJava)
+  def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes)
+}
+
 object ZkData {
 
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
@@ -503,11 +530,12 @@ object ZkData {
     AclZNode.path,
     AclChangeNotificationZNode.path,
     ProducerIdBlockZNode.path,
-    LogDirEventNotificationZNode.path)
+    LogDirEventNotificationZNode.path,
+    DelegationTokenAuthZNode.path)
 
   // These are persistent ZK paths that should exist on kafka broker startup.
   val PersistentZkPaths = Seq(
-    "/consumers",  // old consumer path
+    "/consumers", // old consumer path
     BrokerIdsZNode.path,
     TopicsZNode.path,
     ConfigEntityChangeNotificationZNode.path,
@@ -518,7 +546,7 @@ object ZkData {
     LogDirEventNotificationZNode.path
   ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)
 
-  val SensitiveRootPaths = Seq(ConfigEntityTypeZNode.path(ConfigType.User))
+  val SensitiveRootPaths = Seq(ConfigEntityTypeZNode.path(ConfigType.User), DelegationTokensZNode.path)
 
   def sensitivePath(path: String): Boolean = {
     path != null && SensitiveRootPaths.exists(path.startsWith)
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..efc5049
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -0,0 +1,96 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import java.util
+
+import kafka.admin.AdminClient
+import kafka.server.KafkaConfig
+import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramMechanism
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+import org.junit.Before
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+
+  val kafkaClientSaslMechanism = "SCRAM-SHA-256"
+  val kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+  override val clientPrincipal = JaasTestUtils.KafkaScramUser
+  private val clientPassword = JaasTestUtils.KafkaScramPassword
+
+  override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin
+  private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
+
+  this.serverConfig.setProperty(KafkaConfig.DelegationTokenMasterKeyProp, "testKey")
+
+  override def configureSecurityBeforeServersStart() {
+    super.configureSecurityBeforeServersStart()
+    zkClient.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
+    // Create broker admin credentials before starting brokers
+    createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
+  }
+
+  override def configureSecurityAfterServersStart() {
+    super.configureSecurityAfterServersStart()
+
+    // create scram credential for user "scram-user"
+    createScramCredentials(zkConnect, clientPrincipal, clientPassword)
+
+    //create a token with "scram-user" credentials
+    val token = createDelegationToken()
+
+    // pass token to client jaas config
+    val clientLoginContext = JaasTestUtils.tokenClientLoginModule(token.tokenInfo().tokenId(), token.hmacAsBase64String())
+    producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+    consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+  }
+
+  @Before
+  override def setUp() {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
+    super.setUp()
+  }
+
+  private def createDelegationToken(): DelegationToken = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
+    config.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+
+    val adminClient = AdminClient.create(config.asScala.toMap)
+    var (error, token)  = adminClient.createToken(List())
+
+    //wait for token to reach all the brokers
+    TestUtils.waitUntilTrue(() => servers.forall(server => !server.tokenCache.tokens().isEmpty),
+      "Timed out waiting for token to propagate to all servers")
+    adminClient.close()
+
+    token
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 7f71faf..9b62728 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -38,12 +38,14 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
   private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2"))
   private val TransactionalIdResources = Set(new Resource(TransactionalId, "t0"), new Resource(TransactionalId, "t1"))
+  private val TokenResources = Set(new Resource(DelegationToken, "token1"), new Resource(DelegationToken, "token2"))
 
   private val ResourceToCommand = Map[Set[Resource], Array[String]](
     TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
     Set(Resource.ClusterResource) -> Array("--cluster"),
     GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2"),
-    TransactionalIdResources -> Array("--transactional-id", "t0", "--transactional-id", "t1")
+    TransactionalIdResources -> Array("--transactional-id", "t0", "--transactional-id", "t1"),
+    TokenResources -> Array("--delegation-token", "token1", "--delegation-token", "token2")
   )
 
   private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
@@ -54,7 +56,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
       Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs",
         "--operation", "AlterConfigs", "--operation", "IdempotentWrite")),
     GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe")),
-    TransactionalIdResources -> (Set(Describe, Write), Array("--operation", "Describe", "--operation", "Write"))
+    TransactionalIdResources -> (Set(Describe, Write), Array("--operation", "Describe", "--operation", "Write")),
+    TokenResources -> (Set(Describe), Array("--operation", "Describe"))
   )
 
   private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]](
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 9c09a43..662d6d2 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -61,6 +61,12 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
    */
   def configureSecurityBeforeServersStart() {}
 
+  /**
+   * Override this in case Tokens or security credentials needs to be created after `servers` are started.
+   * The default implementation of this method is a no-op.
+   */
+  def configureSecurityAfterServersStart() {}
+
   def configs: Seq[KafkaConfig] = {
     if (instanceConfigs == null)
       instanceConfigs = generateConfigs
@@ -94,6 +100,9 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
     brokerList = TestUtils.bootstrapServers(servers, listenerName)
     alive = new Array[Boolean](servers.length)
     Arrays.fill(alive, true)
+
+    // default implementation is a no-op, it is overridden by subclasses if required
+    configureSecurityAfterServersStart()
   }
 
   @After
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index f5c9f03..13299c7 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -61,7 +61,7 @@ class SocketServerTest extends JUnitSuite {
   props.put("connections.max.idle.ms", "60000")
   val config = KafkaConfig.fromProps(props)
   val metrics = new Metrics
-  val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
+  val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, null)
   val localAddress = InetAddress.getLoopbackAddress
 
   // Clean-up any metrics left around by previous tests
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
new file mode 100644
index 0000000..8c03548
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -0,0 +1,311 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.security.token.delegation
+
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.util.Properties
+
+import kafka.network.RequestChannel.Session
+import kafka.security.auth.Acl.WildCardHost
+import kafka.security.auth._
+import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.scram.ScramMechanism
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
+
+  val time = new MockTime()
+  val owner = SecurityUtils.parseKafkaPrincipal("User:owner")
+  val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer1"))
+
+  val masterKey = "masterKey"
+  val maxLifeTimeMsDefault = Defaults.DelegationTokenMaxLifeTimeMsDefault
+  val renewTimeMsDefault = Defaults.DelegationTokenExpiryTimeMsDefault
+  var tokenCache: DelegationTokenCache = null
+  var props: Properties = null
+
+  var createTokenResult: CreateTokenResult = _
+  var error: Errors = Errors.NONE
+  var expiryTimeStamp: Long = 0
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    props = TestUtils.createBrokerConfig(0, zkConnect, enableToken = true)
+    props.put(KafkaConfig.SaslEnabledMechanismsProp, ScramMechanism.mechanismNames().asScala.mkString(","))
+    props.put(KafkaConfig.DelegationTokenMasterKeyProp, masterKey)
+    tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames())
+  }
+
+  @Test
+  def testTokenRequestsWithDelegationTokenDisabled(): Unit = {
+    val props: Properties = TestUtils.createBrokerConfig(0, zkConnect)
+    val config = KafkaConfig.fromProps(props)
+    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+
+    tokenManager.createToken(owner, renewer, -1, createTokenResultCallBack)
+    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createTokenResult.error)
+    assert(Array[Byte]() sameElements createTokenResult.hmac)
+
+    tokenManager.renewToken(owner, ByteBuffer.wrap("test".getBytes), 1000000, renewResponseCallback)
+    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, error)
+
+    tokenManager.expireToken(owner, ByteBuffer.wrap("test".getBytes), 1000000, renewResponseCallback)
+    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, error)
+  }
+
+  @Test
+  def testCreateToken(): Unit = {
+    val config = KafkaConfig.fromProps(props)
+    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    tokenManager.startup
+
+    tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
+    val issueTime = time.milliseconds
+    val tokenId = createTokenResult.tokenId
+    val password = DelegationTokenManager.createHmac(tokenId, masterKey)
+    assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault,  issueTime + maxLifeTimeMsDefault, tokenId, password, Errors.NONE), createTokenResult)
+
+    val token = tokenManager.getToken(tokenId)
+    assertTrue(!token.isEmpty )
+    assertTrue(password sameElements token.get.hmac)
+  }
+
+  @Test
+  def testRenewToken(): Unit = {
+    val config = KafkaConfig.fromProps(props)
+    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    tokenManager.startup
+
+    tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
+    val issueTime = time.milliseconds
+    val maxLifeTime = issueTime + maxLifeTimeMsDefault
+    val tokenId = createTokenResult.tokenId
+    val password = DelegationTokenManager.createHmac(tokenId, masterKey)
+    assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault,  maxLifeTime, tokenId, password, Errors.NONE), createTokenResult)
+
+    //try renewing non-existing token
+    tokenManager.renewToken(owner, ByteBuffer.wrap("test".getBytes), -1 , renewResponseCallback)
+    assertEquals(Errors.DELEGATION_TOKEN_NOT_FOUND, error)
+
+    //try renew non-owned tokens
+    val unknownOwner = SecurityUtils.parseKafkaPrincipal("User:Unknown")
+    tokenManager.renewToken(unknownOwner, ByteBuffer.wrap(password), -1 , renewResponseCallback)
+    assertEquals(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, error)
+
+    // try renew with default time period
+    time.sleep(24 * 60 * 60 * 1000L)
+    var expectedExpiryStamp = time.milliseconds + renewTimeMsDefault
+    tokenManager.renewToken(owner, ByteBuffer.wrap(password), -1 , renewResponseCallback)
+    assertEquals(expectedExpiryStamp, expiryTimeStamp)
+    assertEquals(Errors.NONE, error)
+
+    // try renew with specific time period
+    time.sleep(24 * 60 * 60 * 1000L)
+    expectedExpiryStamp = time.milliseconds + 1 * 60 * 60 * 1000L
+    tokenManager.renewToken(owner, ByteBuffer.wrap(password), 1 * 60 * 60 * 1000L , renewResponseCallback)
+    assertEquals(expectedExpiryStamp, expiryTimeStamp)
+    assertEquals(Errors.NONE, error)
+
+    //try renewing more than max time period
+    time.sleep( 1 * 60 * 60 * 1000L)
+    tokenManager.renewToken(owner, ByteBuffer.wrap(password), 8 * 24 * 60 * 60 * 1000L, renewResponseCallback)
+    assertEquals(maxLifeTime, expiryTimeStamp)
+    assertEquals(Errors.NONE, error)
+
+    //try renewing expired token
+    time.sleep(8 * 24 * 60 * 60 * 1000L)
+    tokenManager.renewToken(owner, ByteBuffer.wrap(password), -1 , renewResponseCallback)
+    assertEquals(Errors.DELEGATION_TOKEN_EXPIRED, error)
+  }
+
+  @Test
+  def testExpireToken(): Unit = {
+    val config = KafkaConfig.fromProps(props)
+    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    tokenManager.startup
+
+    tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
+    val issueTime = time.milliseconds
+    val tokenId = createTokenResult.tokenId
+    val password = DelegationTokenManager.createHmac(tokenId, masterKey)
+    assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault,  issueTime + maxLifeTimeMsDefault, tokenId, password, Errors.NONE), createTokenResult)
+
+    //try expire non-existing token
+    tokenManager.expireToken(owner, ByteBuffer.wrap("test".getBytes), -1 , renewResponseCallback)
+    assertEquals(Errors.DELEGATION_TOKEN_NOT_FOUND, error)
+
+    //try expire non-owned tokens
+    val unknownOwner = SecurityUtils.parseKafkaPrincipal("User:Unknown")
+    tokenManager.expireToken(unknownOwner, ByteBuffer.wrap(password), -1 , renewResponseCallback)
+    assertEquals(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, error)
+
+    //try expire token at a timestamp
+    time.sleep(24 * 60 * 60 * 1000L)
+    val expectedExpiryStamp = time.milliseconds + 2 * 60 * 60 * 1000L
+    tokenManager.expireToken(owner, ByteBuffer.wrap(password), 2 * 60 * 60 * 1000L, renewResponseCallback)
+    assertEquals(expectedExpiryStamp, expiryTimeStamp)
+
+    //try expire token immediately
+    time.sleep(1 * 60 * 60 * 1000L)
+    tokenManager.expireToken(owner, ByteBuffer.wrap(password), -1, renewResponseCallback)
+    assert(tokenManager.getToken(tokenId).isEmpty)
+    assertEquals(Errors.NONE, error)
+    assertEquals(time.milliseconds, expiryTimeStamp)
+  }
+
+  @Test
+  def testDescribeToken(): Unit = {
+
+    val config = KafkaConfig.fromProps(props)
+
+    val owner1 = SecurityUtils.parseKafkaPrincipal("User:owner1")
+    val owner2 = SecurityUtils.parseKafkaPrincipal("User:owner2")
+    val owner3 = SecurityUtils.parseKafkaPrincipal("User:owner3")
+    val owner4 = SecurityUtils.parseKafkaPrincipal("User:owner4")
+
+    val renewer1 = SecurityUtils.parseKafkaPrincipal("User:renewer1")
+    val renewer2 = SecurityUtils.parseKafkaPrincipal("User:renewer2")
+    val renewer3 = SecurityUtils.parseKafkaPrincipal("User:renewer3")
+    val renewer4 = SecurityUtils.parseKafkaPrincipal("User:renewer4")
+
+    val simpleAclAuthorizer = new SimpleAclAuthorizer
+    simpleAclAuthorizer.configure(config.originals)
+
+    var hostSession = new Session(owner1, InetAddress.getByName("192.168.1.1"))
+
+    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    tokenManager.startup
+
+    //create tokens
+    tokenManager.createToken(owner1, List(renewer1, renewer2), 1 * 60 * 60 * 1000L, createTokenResultCallBack)
+
+    tokenManager.createToken(owner2, List(renewer3), 1 * 60 * 60 * 1000L, createTokenResultCallBack)
+    val tokenId2 = createTokenResult.tokenId
+
+    tokenManager.createToken(owner3, List(renewer4), 2 * 60 * 60 * 1000L, createTokenResultCallBack)
+    val tokenId3 = createTokenResult.tokenId
+
+    tokenManager.createToken(owner4, List(owner1, renewer4), 2 * 60 * 60 * 1000L, createTokenResultCallBack)
+
+    assert(tokenManager.getAllTokenInformation().size == 4 )
+
+    //get tokens non-exiting owner
+    var  tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(SecurityUtils.parseKafkaPrincipal("User:unknown")))
+    assert(tokens.size == 0)
+
+    //get all tokens for  empty owner list
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List())
+    assert(tokens.size == 0)
+
+    //get all tokens for owner1
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1))
+    assert(tokens.size == 2)
+
+    //get all tokens for owner1
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, null)
+    assert(tokens.size == 2)
+
+    //get all tokens for unknown owner
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, SecurityUtils.parseKafkaPrincipal("User:unknown"), null)
+    assert(tokens.size == 0)
+
+    //get all tokens for multiple owners (owner1, renewer4) and without permission for renewer4
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1, renewer4))
+    assert(tokens.size == 2)
+
+    //get all tokens for multiple owners (owner1, renewer4) and with permission
+    var acl = new Acl(owner1, Allow, WildCardHost, Describe)
+    simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId3))
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1, renewer4))
+    assert(tokens.size == 3)
+
+    //get all tokens for renewer4 which is a renewer principal for some tokens
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession,  renewer4, List(renewer4))
+    assert(tokens.size == 2)
+
+    //get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and without permissions for renewer3
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession,  renewer2, List(renewer2, renewer3))
+    assert(tokens.size == 1)
+
+    //get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
+    hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
+    acl = new Acl(renewer2, Allow, WildCardHost, Describe)
+    simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId2))
+    tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession,  renewer2, List(renewer2, renewer3))
+    assert(tokens.size == 2)
+
+    simpleAclAuthorizer.close()
+  }
+
+  private def getTokens(tokenManager: DelegationTokenManager, simpleAclAuthorizer: SimpleAclAuthorizer, hostSession: Session,
+                        requestPrincipal: KafkaPrincipal, requestedOwners: List[KafkaPrincipal]): List[DelegationToken] = {
+
+    if (requestedOwners != null && requestedOwners.isEmpty) {
+      List()
+    }
+    else {
+      def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId))
+      def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, Option(requestedOwners), token, authorizeToken)
+      tokenManager.getTokens(eligible)
+    }
+  }
+
+  @Test
+  def testPeriodicTokenExpiry(): Unit = {
+    val config = KafkaConfig.fromProps(props)
+    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    tokenManager.startup
+
+    //create tokens
+    tokenManager.createToken(owner, renewer, 1 * 60 * 60 * 1000L, createTokenResultCallBack)
+    tokenManager.createToken(owner, renewer, 1 * 60 * 60 * 1000L, createTokenResultCallBack)
+    tokenManager.createToken(owner, renewer, 2 * 60 * 60 * 1000L, createTokenResultCallBack)
+    tokenManager.createToken(owner, renewer, 2 * 60 * 60 * 1000L, createTokenResultCallBack)
+    assert(tokenManager.getAllTokenInformation().size == 4 )
+
+    time.sleep(2 * 60 * 60 * 1000L)
+    tokenManager.expireTokens()
+    assert(tokenManager.getAllTokenInformation().size == 2 )
+
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+  }
+
+  private def createTokenResultCallBack(ret: CreateTokenResult): Unit = {
+    createTokenResult = ret
+  }
+
+  private def renewResponseCallback(ret: Errors, timeStamp: Long): Unit = {
+    error = ret
+    expiryTimeStamp = timeStamp
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
new file mode 100644
index 0000000..4c42dd2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -0,0 +1,76 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.nio.ByteBuffer
+import java.util
+
+import kafka.admin.AdminClient
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.protocol.Errors
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
+  var adminClient: AdminClient = null
+
+  override def numBrokers = 1
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+  }
+
+  def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  @Test
+  def testDelegationTokenRequests(): Unit = {
+    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+
+    val createResponse = adminClient.createToken(List())
+    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, createResponse._1)
+
+    val describeResponse = adminClient.describeToken(List())
+    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, describeResponse._1)
+
+    //test renewing tokens
+    val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
+    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, renewResponse._1)
+
+    //test expire tokens tokens
+    val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
+    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, expireResponse._1)
+  }
+
+
+  @After
+  override def tearDown(): Unit = {
+    if (adminClient != null)
+      adminClient.close()
+    super.tearDown()
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
new file mode 100644
index 0000000..55bf5fd
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -0,0 +1,116 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util
+
+import kafka.admin.AdminClient
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.SecurityUtils
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List("PLAIN")
+  protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+  var adminClient: AdminClient = null
+
+  override def numBrokers = 1
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  override def generateConfigs = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+      enableControlledShutdown = false, enableDeleteTopic = true,
+      interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
+    props.foreach(propertyOverrides)
+    props.map(KafkaConfig.fromProps)
+  }
+
+  @Test
+  def testDelegationTokenRequests(): Unit = {
+    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+
+    // test creating token
+    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
+    val tokenResult1 = adminClient.createToken(renewer1)
+    assertEquals(Errors.NONE, tokenResult1._1)
+    var token1 = adminClient.describeToken(null)._2.head
+    assertEquals(token1, tokenResult1._2)
+
+    //test renewing tokens
+    val renewResponse = adminClient.renewToken(token1.hmacBuffer())
+    assertEquals(Errors.NONE, renewResponse._1)
+
+    token1 = adminClient.describeToken(null)._2.head
+    assertEquals(renewResponse._2, token1.tokenInfo().expiryTimestamp())
+
+    //test describe tokens
+    val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:Renewer1"))
+    val tokenResult2 = adminClient.createToken(renewer2)
+    assertEquals(Errors.NONE, tokenResult2._1)
+    val token2 = tokenResult2._2
+
+    assertTrue(adminClient.describeToken(null)._2.size == 2)
+
+    //test expire tokens tokens
+    val expireResponse1 = adminClient.expireToken(token1.hmacBuffer())
+    assertEquals(Errors.NONE, expireResponse1._1)
+
+    val expireResponse2 = adminClient.expireToken(token2.hmacBuffer())
+    assertEquals(Errors.NONE, expireResponse2._1)
+
+    assertTrue(adminClient.describeToken(null)._2.size == 0)
+
+    //create token with invalid principal type
+    val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer1"))
+    val tokenResult3 = adminClient.createToken(renewer3)
+    assertEquals(Errors.INVALID_PRINCIPAL_TYPE, tokenResult3._1)
+
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (adminClient != null)
+      adminClient.close()
+    super.tearDown()
+    closeSasl()
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
new file mode 100644
index 0000000..0561cac
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -0,0 +1,87 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.nio.ByteBuffer
+import java.util
+
+import kafka.admin.AdminClient
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.SecurityUtils
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup {
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List("PLAIN")
+  protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+  var adminClient: AdminClient = null
+
+  override def numBrokers = 1
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  @Test
+  def testDelegationTokenRequests(): Unit = {
+    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+
+    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
+    val createResponse = adminClient.createToken(renewer1)
+    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createResponse._1)
+
+    val describeResponse = adminClient.describeToken(List())
+    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, describeResponse._1)
+
+    //test renewing tokens
+    val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
+    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, renewResponse._1)
+
+    //test expire tokens tokens
+    val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
+    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, expireResponse._1)
+
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (adminClient != null)
+      adminClient.close()
+    super.tearDown()
+    closeSasl()
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a85a10b..8e907d9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -98,7 +98,8 @@ class KafkaApisTest {
       quotas,
       brokerTopicStats,
       clusterId,
-      time
+      time,
+      null
     )
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9c459d8..748efec 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -674,6 +674,12 @@ class KafkaConfigTest {
         case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
         case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
         case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
+
+        //delegation token configs
+        case KafkaConfig.DelegationTokenMasterKeyProp => // ignore
+        case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })
@@ -718,6 +724,14 @@ class KafkaConfigTest {
     assertEquals(123L, config.logFlushIntervalMs)
     assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
     assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
+    assertEquals(false, config.tokenAuthEnabled)
+    assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs)
+    assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
+    assertEquals(1 * 60L * 1000L * 60, config.delegationTokenExpiryCheckIntervalMs)
+
+    defaults.put(KafkaConfig.DelegationTokenMasterKeyProp, "1234567890")
+    val config1 = KafkaConfig.fromProps(defaults)
+    assertEquals(true, config1.tokenAuthEnabled)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 886c318..a90fa64 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
 import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
 import org.apache.kafka.common.utils.Sanitizer
+import org.apache.kafka.common.utils.SecurityUtils
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -305,6 +306,18 @@ class RequestQuotaTest extends BaseRequestTest {
             Collections.singletonMap("topic-2", NewPartitions.increaseTo(1)), 0, false
           )
 
+        case ApiKeys.CREATE_DELEGATION_TOKEN =>
+          new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000)
+
+        case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
+          new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+
+        case ApiKeys.DESCRIBE_DELEGATION_TOKEN=>
+          new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
+
+        case ApiKeys.RENEW_DELEGATION_TOKEN=>
+          new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -399,6 +412,10 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.ALTER_REPLICA_LOG_DIRS => new AlterReplicaLogDirsResponse(response).throttleTimeMs
       case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs
       case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
+      case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response).throttleTimeMs
+      case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
+      case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
+      case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 1f9ccf3..9ce3b01 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -72,14 +72,15 @@ object JaasTestUtils {
 
   case class ScramLoginModule(username: String,
                               password: String,
-                              debug: Boolean = false) extends JaasModule {
+                              debug: Boolean = false,
+                              tokenProps: Map[String, String] = Map.empty) extends JaasModule {
 
     def name = "org.apache.kafka.common.security.scram.ScramLoginModule"
 
     def entries: Map[String, String] = Map(
       "username" -> username,
       "password" -> password
-    )
+    ) ++ tokenProps.map { case (name, value) => name -> value }
   }
 
   sealed trait JaasModule {
@@ -157,6 +158,16 @@ object JaasTestUtils {
   def clientLoginModule(mechanism: String, keytabLocation: Option[File]): String =
     kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString
 
+  def tokenClientLoginModule(tokenId: String, password: String): String = {
+    ScramLoginModule(
+      tokenId,
+      password,
+      debug = false,
+      Map(
+        "tokenauth" -> "true"
+      )).toString
+  }
+
   def zkSections: Seq[JaasSection] = Seq(
     JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false,
       Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2c2d9dd..09e4c94 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -158,11 +158,12 @@ object TestUtils extends Logging {
     enableSaslPlaintext: Boolean = false,
     enableSaslSsl: Boolean = false,
     rackInfo: Map[Int, String] = Map(),
-    logDirCount: Int = 1): Seq[Properties] = {
+    logDirCount: Int = 1,
+    enableToken: Boolean = false): Seq[Properties] = {
     (0 until numConfigs).map { node =>
       createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort,
         interBrokerSecurityProtocol, trustStoreFile, saslProperties, enablePlaintext = enablePlaintext, enableSsl = enableSsl,
-        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node), logDirCount = logDirCount)
+        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node), logDirCount = logDirCount, enableToken = enableToken)
     }
   }
 
@@ -213,7 +214,8 @@ object TestUtils extends Logging {
                          enableSaslSsl: Boolean = false,
                          saslSslPort: Int = RandomPort,
                          rack: Option[String] = None,
-                         logDirCount: Int = 1): Properties = {
+                         logDirCount: Int = 1,
+                         enableToken: Boolean = false): Properties = {
     def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol)
 
     val protocolAndPorts = ArrayBuffer[(SecurityProtocol, Int)]()
@@ -270,6 +272,9 @@ object TestUtils extends Logging {
       props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name)
     }
 
+    if (enableToken)
+      props.put(KafkaConfig.DelegationTokenMasterKeyProp, "masterkey")
+
     props
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index f3b8e81..d3726c2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -28,10 +28,15 @@ import kafka.utils.CoreUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.token.delegation.TokenInformation
+import org.apache.kafka.common.utils.SecurityUtils
 import org.apache.zookeeper.KeeperException.NodeExistsException
 import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+import scala.util.Random
+
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
@@ -472,4 +477,39 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     data.map(new String(_, UTF_8))
   }
 
+  @Test
+  def testDelegationTokenMethods() {
+    assertFalse(zkClient.pathExists(DelegationTokensZNode.path))
+    assertFalse(zkClient.pathExists(DelegationTokenChangeNotificationZNode.path))
+
+    zkClient.createDelegationTokenPaths
+    assertTrue(zkClient.pathExists(DelegationTokensZNode.path))
+    assertTrue(zkClient.pathExists(DelegationTokenChangeNotificationZNode.path))
+
+    val tokenId = "token1"
+    val owner = SecurityUtils.parseKafkaPrincipal("User:owner1")
+    val renewers = List(SecurityUtils.parseKafkaPrincipal("User:renewer1"), SecurityUtils.parseKafkaPrincipal("User:renewer1"))
+
+    val tokenInfo = new TokenInformation(tokenId, owner, renewers.asJava,
+      System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis())
+    val bytes = new Array[Byte](20)
+    Random.nextBytes(bytes)
+    val token = new org.apache.kafka.common.security.token.delegation.DelegationToken(tokenInfo, bytes)
+
+    // test non-existent token
+    assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+
+    // create a token
+    zkClient.setOrCreateDelegationToken(token)
+
+    //get created token
+    assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+    //update expiryTime
+    tokenInfo.setExpiryTimestamp(System.currentTimeMillis())
+    zkClient.setOrCreateDelegationToken(token)
+
+    //test updated token
+    assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].