You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/01/16 17:50:25 UTC
[kafka] branch trunk updated: KAFKA-4541;
Support for delegation token mechanism
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27a8d0f KAFKA-4541; Support for delegation token mechanism
27a8d0f is described below
commit 27a8d0f9e7f3b05f331d1449b17cf7e085a4b45c
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Tue Jan 16 09:50:31 2018 -0800
KAFKA-4541; Support for delegation token mechanism
- Add capability to create delegation token
- Add authentication based on delegation token.
- Add capability to renew/expire delegation tokens.
- Add units tests and integration tests
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #3616 from omkreddy/KAFKA-4541
---
bin/kafka-delegation-tokens.sh | 17 +
bin/windows/kafka-delegation-tokens.bat | 17 +
checkstyle/suppressions.xml | 5 +-
.../DelegationTokenAuthorizationException.java} | 18 +-
.../DelegationTokenDisabledException.java} | 18 +-
.../DelegationTokenExpiredException.java} | 18 +-
.../DelegationTokenNotFoundException.java} | 18 +-
.../DelegationTokenOwnerMismatchException.java} | 18 +-
.../InvalidPrincipalTypeException.java} | 18 +-
.../UnsupportedByAuthenticationException.java} | 22 +-
.../kafka/common/network/ChannelBuilders.java | 13 +-
.../kafka/common/network/SaslChannelBuilder.java | 10 +-
.../org/apache/kafka/common/protocol/ApiKeys.java | 14 +-
.../apache/kafka/common/protocol/CommonFields.java | 4 +
.../org/apache/kafka/common/protocol/Errors.java | 49 ++
.../kafka/common/requests/AbstractRequest.java | 8 +
.../kafka/common/requests/AbstractResponse.java | 8 +
.../requests/CreateDelegationTokenRequest.java | 136 ++++++
.../requests/CreateDelegationTokenResponse.java | 168 +++++++
.../requests/DescribeDelegationTokenRequest.java | 132 ++++++
.../requests/DescribeDelegationTokenResponse.java | 187 ++++++++
.../requests/ExpireDelegationTokenRequest.java | 112 +++++
.../requests/ExpireDelegationTokenResponse.java | 96 ++++
.../requests/RenewDelegationTokenRequest.java | 112 +++++
.../requests/RenewDelegationTokenResponse.java | 96 ++++
.../apache/kafka/common/resource/ResourceType.java | 7 +-
.../kafka/common/security/auth/KafkaPrincipal.java | 15 +-
.../DefaultKafkaPrincipalBuilder.java | 10 +-
.../authenticator/SaslClientCallbackHandler.java | 8 +-
.../authenticator/SaslServerAuthenticator.java | 9 +-
... => DelegationTokenAuthenticationCallback.java} | 13 +-
.../security/scram/ScramCredentialCallback.java | 24 +
.../common/security/scram/ScramLoginModule.java | 4 +
.../kafka/common/security/scram/ScramMessages.java | 34 +-
.../common/security/scram/ScramSaslClient.java | 8 +-
.../common/security/scram/ScramSaslServer.java | 19 +-
.../security/scram/ScramServerCallbackHandler.java | 17 +-
.../security/token/delegation/DelegationToken.java | 80 ++++
.../token/delegation/DelegationTokenCache.java | 111 +++++
.../token/delegation/TokenInformation.java | 136 ++++++
.../apache/kafka/common/network/NioEchoServer.java | 10 +-
.../common/network/SaslChannelBuilderTest.java | 2 +-
.../kafka/common/protocol/ProtoUtilsTest.java | 3 +-
.../kafka/common/requests/RequestResponseTest.java | 70 +++
.../kafka/common/resource/ResourceTypeTest.java | 3 +-
.../auth/DefaultKafkaPrincipalBuilderTest.java | 2 +
.../authenticator/SaslAuthenticatorTest.java | 53 ++-
.../authenticator/SaslServerAuthenticatorTest.java | 4 +-
.../common/security/scram/ScramMessagesTest.java | 7 +-
.../common/security/scram/ScramSaslServerTest.java | 3 +-
core/src/main/scala/kafka/admin/AclCommand.scala | 14 +-
core/src/main/scala/kafka/admin/AdminClient.scala | 33 +-
.../scala/kafka/admin/DelegationTokenCommand.scala | 214 +++++++++
.../scala/kafka/controller/KafkaController.scala | 19 +-
.../main/scala/kafka/network/SocketServer.scala | 2 +-
.../scala/kafka/security/CredentialProvider.scala | 3 +-
.../scala/kafka/security/auth/ResourceType.scala | 8 +-
.../kafka/server/DelegationTokenManager.scala | 515 +++++++++++++++++++++
core/src/main/scala/kafka/server/KafkaApis.scala | 130 +++++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 31 ++
core/src/main/scala/kafka/server/KafkaServer.scala | 18 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 4 +-
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 73 +++
core/src/main/scala/kafka/zk/ZkData.scala | 38 +-
.../DelegationTokenEndToEndAuthorizationTest.scala | 96 ++++
.../scala/unit/kafka/admin/AclCommandTest.scala | 7 +-
.../kafka/integration/KafkaServerTestHarness.scala | 9 +
.../unit/kafka/network/SocketServerTest.scala | 2 +-
.../delegation/DelegationTokenManagerTest.scala | 311 +++++++++++++
.../DelegationTokenRequestsOnPlainTextTest.scala | 76 +++
.../kafka/server/DelegationTokenRequestsTest.scala | 116 +++++
...nTokenRequestsWithDisableTokenFeatureTest.scala | 87 ++++
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 14 +
.../scala/unit/kafka/server/RequestQuotaTest.scala | 17 +
.../scala/unit/kafka/utils/JaasTestUtils.scala | 15 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 11 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 40 ++
78 files changed, 3732 insertions(+), 140 deletions(-)
diff --git a/bin/kafka-delegation-tokens.sh b/bin/kafka-delegation-tokens.sh
new file mode 100755
index 0000000..49cb276
--- /dev/null
+++ b/bin/kafka-delegation-tokens.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.DelegationTokenCommand "$@"
diff --git a/bin/windows/kafka-delegation-tokens.bat b/bin/windows/kafka-delegation-tokens.bat
new file mode 100644
index 0000000..996537f
--- /dev/null
+++ b/bin/windows/kafka-delegation-tokens.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+"%~dp0kafka-run-class.bat" kafka.admin.DelegationTokenCommand %*
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f610e88..05d4a52 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -51,14 +51,11 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
<suppress checks="CyclomaticComplexity"
- files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer|SaslClientAuthenticator).java"/>
+ files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
<suppress checks="JavaNCSS"
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
- <suppress checks="JavaNCSS"
- files="AbstractRequest.java"/>
-
<suppress checks="NPathComplexity"
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenAuthorizationException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenAuthorizationException.java
index 931210a..ddc97c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenAuthorizationException.java
@@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
-import javax.security.auth.callback.Callback;
+public class DelegationTokenAuthorizationException extends AuthorizationException {
-public class ScramCredentialCallback implements Callback {
- private ScramCredential scramCredential;
+ private static final long serialVersionUID = 1L;
- public ScramCredential scramCredential() {
- return scramCredential;
+ public DelegationTokenAuthorizationException(String message) {
+ super(message);
}
- public void scramCredential(ScramCredential scramCredential) {
- this.scramCredential = scramCredential;
+ public DelegationTokenAuthorizationException(String message, Throwable cause) {
+ super(message, cause);
}
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenDisabledException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenDisabledException.java
index 931210a..798611e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenDisabledException.java
@@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
-import javax.security.auth.callback.Callback;
+public class DelegationTokenDisabledException extends ApiException {
-public class ScramCredentialCallback implements Callback {
- private ScramCredential scramCredential;
+ private static final long serialVersionUID = 1L;
- public ScramCredential scramCredential() {
- return scramCredential;
+ public DelegationTokenDisabledException(String message) {
+ super(message);
}
- public void scramCredential(ScramCredential scramCredential) {
- this.scramCredential = scramCredential;
+ public DelegationTokenDisabledException(String message, Throwable cause) {
+ super(message, cause);
}
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenExpiredException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenExpiredException.java
index 931210a..4dae7f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenExpiredException.java
@@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
-import javax.security.auth.callback.Callback;
+public class DelegationTokenExpiredException extends ApiException {
-public class ScramCredentialCallback implements Callback {
- private ScramCredential scramCredential;
+ private static final long serialVersionUID = 1L;
- public ScramCredential scramCredential() {
- return scramCredential;
+ public DelegationTokenExpiredException(String message) {
+ super(message);
}
- public void scramCredential(ScramCredential scramCredential) {
- this.scramCredential = scramCredential;
+ public DelegationTokenExpiredException(String message, Throwable cause) {
+ super(message, cause);
}
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenNotFoundException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenNotFoundException.java
index 931210a..5875edf 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenNotFoundException.java
@@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
-import javax.security.auth.callback.Callback;
+public class DelegationTokenNotFoundException extends ApiException {
-public class ScramCredentialCallback implements Callback {
- private ScramCredential scramCredential;
+ private static final long serialVersionUID = 1L;
- public ScramCredential scramCredential() {
- return scramCredential;
+ public DelegationTokenNotFoundException(String message) {
+ super(message);
}
- public void scramCredential(ScramCredential scramCredential) {
- this.scramCredential = scramCredential;
+ public DelegationTokenNotFoundException(String message, Throwable cause) {
+ super(message, cause);
}
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenOwnerMismatchException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenOwnerMismatchException.java
index 931210a..5c8239e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DelegationTokenOwnerMismatchException.java
@@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
-import javax.security.auth.callback.Callback;
+public class DelegationTokenOwnerMismatchException extends ApiException {
-public class ScramCredentialCallback implements Callback {
- private ScramCredential scramCredential;
+ private static final long serialVersionUID = 1L;
- public ScramCredential scramCredential() {
- return scramCredential;
+ public DelegationTokenOwnerMismatchException(String message) {
+ super(message);
}
- public void scramCredential(ScramCredential scramCredential) {
- this.scramCredential = scramCredential;
+ public DelegationTokenOwnerMismatchException(String message, Throwable cause) {
+ super(message, cause);
}
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPrincipalTypeException.java
similarity index 67%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/InvalidPrincipalTypeException.java
index 931210a..a0736e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPrincipalTypeException.java
@@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
-import javax.security.auth.callback.Callback;
+public class InvalidPrincipalTypeException extends ApiException {
-public class ScramCredentialCallback implements Callback {
- private ScramCredential scramCredential;
+ private static final long serialVersionUID = 1L;
- public ScramCredential scramCredential() {
- return scramCredential;
+ public InvalidPrincipalTypeException(String message) {
+ super(message);
}
- public void scramCredential(ScramCredential scramCredential) {
- this.scramCredential = scramCredential;
+ public InvalidPrincipalTypeException(String message, Throwable cause) {
+ super(message, cause);
}
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedByAuthenticationException.java
similarity index 64%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/errors/UnsupportedByAuthenticationException.java
index 931210a..40f357c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedByAuthenticationException.java
@@ -14,18 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.scram;
+package org.apache.kafka.common.errors;
-import javax.security.auth.callback.Callback;
-
-public class ScramCredentialCallback implements Callback {
- private ScramCredential scramCredential;
+/**
+ * Authentication mechanism does not support the requested function.
+ */
+public class UnsupportedByAuthenticationException extends ApiException {
+ private static final long serialVersionUID = 1L;
- public ScramCredential scramCredential() {
- return scramCredential;
+ public UnsupportedByAuthenticationException(String message) {
+ super(message);
}
- public void scramCredential(ScramCredential scramCredential) {
- this.scramCredential = scramCredential;
+ public UnsupportedByAuthenticationException(String message, Throwable cause) {
+ super(message, cause);
}
-}
\ No newline at end of file
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 74bd0a0..8f301ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.utils.Utils;
import java.util.Map;
@@ -59,7 +60,7 @@ public class ChannelBuilders {
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism,
- saslHandshakeRequestEnable, null);
+ saslHandshakeRequestEnable, null, null);
}
/**
@@ -72,9 +73,10 @@ public class ChannelBuilders {
public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
SecurityProtocol securityProtocol,
AbstractConfig config,
- CredentialCache credentialCache) {
+ CredentialCache credentialCache,
+ DelegationTokenCache tokenCache) {
return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null,
- true, credentialCache);
+ true, credentialCache, tokenCache);
}
private static ChannelBuilder create(SecurityProtocol securityProtocol,
@@ -84,7 +86,8 @@ public class ChannelBuilders {
ListenerName listenerName,
String clientSaslMechanism,
boolean saslHandshakeRequestEnable,
- CredentialCache credentialCache) {
+ CredentialCache credentialCache,
+ DelegationTokenCache tokenCache) {
Map<String, ?> configs;
if (listenerName == null)
configs = config.values();
@@ -102,7 +105,7 @@ public class ChannelBuilders {
requireNonNullMode(mode, securityProtocol);
JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, listenerName,
- clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
+ clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder();
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 72a2bc1..d6dd5fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -22,11 +22,12 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
-import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.Java;
import org.slf4j.Logger;
@@ -53,6 +54,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
private final JaasContext jaasContext;
private final boolean handshakeRequestEnable;
private final CredentialCache credentialCache;
+ private final DelegationTokenCache tokenCache;
private LoginManager loginManager;
private SslFactory sslFactory;
@@ -65,7 +67,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
ListenerName listenerName,
String clientSaslMechanism,
boolean handshakeRequestEnable,
- CredentialCache credentialCache) {
+ CredentialCache credentialCache,
+ DelegationTokenCache tokenCache) {
this.mode = mode;
this.jaasContext = jaasContext;
this.securityProtocol = securityProtocol;
@@ -73,6 +76,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
this.handshakeRequestEnable = handshakeRequestEnable;
this.clientSaslMechanism = clientSaslMechanism;
this.credentialCache = credentialCache;
+ this.tokenCache = tokenCache;
}
@Override
@@ -153,7 +157,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
TransportLayer transportLayer, Subject subject) throws IOException {
return new SaslServerAuthenticator(configs, id, jaasContext, subject,
- kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer);
+ kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer, tokenCache);
}
// Visible to override for testing
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index cf1bff5..b408e80 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
@@ -53,8 +55,12 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
@@ -83,6 +89,8 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
@@ -171,7 +179,11 @@ public enum ApiKeys {
SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequest.schemaVersions(),
SaslAuthenticateResponse.schemaVersions()),
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
- CreatePartitionsResponse.schemaVersions());
+ CreatePartitionsResponse.schemaVersions()),
+ CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()),
+ RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
+ EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
+ DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions());
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index e1d1884..a436dff 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -50,4 +50,8 @@ public class CommonFields {
public static final Field.NullableStr HOST_FILTER = new Field.NullableStr("host", "The ACL host filter");
public static final Field.Int8 OPERATION = new Field.Int8("operation", "The ACL operation");
public static final Field.Int8 PERMISSION_TYPE = new Field.Int8("permission_type", "The ACL permission type");
+
+ public static final Field.Str PRINCIPAL_TYPE = new Field.Str("principal_type", "principalType of the Kafka principal");
+ public static final Field.Str PRINCIPAL_NAME = new Field.Str("name", "name of the Kafka principal");
+
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d937054..bd5b800 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -27,6 +28,9 @@ import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.DuplicateSequenceException;
+import org.apache.kafka.common.errors.DelegationTokenDisabledException;
+import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
+import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
@@ -68,6 +72,8 @@ import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
+import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
@@ -76,6 +82,7 @@ import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -545,6 +552,48 @@ public enum Errors {
public ApiException build(String message) {
return new ReassignmentInProgressException(message);
}
+ }),
+ DELEGATION_TOKEN_AUTH_DISABLED(61, "Delegation Token feature is not enabled.", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new DelegationTokenDisabledException(message);
+ }
+ }),
+ DELEGATION_TOKEN_NOT_FOUND(62, "Delegation Token is not found on server.", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new DelegationTokenNotFoundException(message);
+ }
+ }),
+ DELEGATION_TOKEN_OWNER_MISMATCH(63, "Specified Principal is not valid Owner/Renewer.", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new DelegationTokenOwnerMismatchException(message);
+ }
+ }),
+ DELEGATION_TOKEN_REQUEST_NOT_ALLOWED(64, "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and " + "on delegation token authenticated channels.", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new UnsupportedByAuthenticationException(message);
+ }
+ }),
+ DELEGATION_TOKEN_AUTHORIZATION_FAILED(65, "Delegation Token authorization failed.", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new DelegationTokenAuthorizationException(message);
+ }
+ }),
+ DELEGATION_TOKEN_EXPIRED(66, "Delegation Token is expired.", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new DelegationTokenExpiredException(message);
+ }
+ }),
+ INVALID_PRINCIPAL_TYPE(67, "Supplied principalType is not supported", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new InvalidPrincipalTypeException(message);
+ }
});
private interface ApiExceptionBuilder {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5a1c4f4..cd213d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -214,6 +214,14 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return new SaslAuthenticateRequest(struct, apiVersion);
case CREATE_PARTITIONS:
return new CreatePartitionsRequest(struct, apiVersion);
+ case CREATE_DELEGATION_TOKEN:
+ return new CreateDelegationTokenRequest(struct, apiVersion);
+ case RENEW_DELEGATION_TOKEN:
+ return new RenewDelegationTokenRequest(struct, apiVersion);
+ case EXPIRE_DELEGATION_TOKEN:
+ return new ExpireDelegationTokenRequest(struct, apiVersion);
+ case DESCRIBE_DELEGATION_TOKEN:
+ return new DescribeDelegationTokenRequest(struct, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 6294af4..fb01298 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -146,6 +146,14 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new SaslAuthenticateResponse(struct);
case CREATE_PARTITIONS:
return new CreatePartitionsResponse(struct);
+ case CREATE_DELEGATION_TOKEN:
+ return new CreateDelegationTokenResponse(struct);
+ case RENEW_DELEGATION_TOKEN:
+ return new RenewDelegationTokenResponse(struct);
+ case EXPIRE_DELEGATION_TOKEN:
+ return new ExpireDelegationTokenResponse(struct);
+ case DESCRIBE_DELEGATION_TOKEN:
+ return new DescribeDelegationTokenResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
new file mode 100644
index 0000000..7c48205
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
+
+public class CreateDelegationTokenRequest extends AbstractRequest {
+ private static final String RENEWERS_KEY_NAME = "renewers";
+ private static final String MAX_LIFE_TIME_KEY_NAME = "max_life_time";
+
+ private static final Schema TOKEN_CREATE_REQUEST_V0 = new Schema(
+ new Field(RENEWERS_KEY_NAME, new ArrayOf(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)),
+ "An array of token renewers. Renewer is an Kafka PrincipalType and name string," +
+ " who is allowed to renew this token before the max lifetime expires."),
+ new Field(MAX_LIFE_TIME_KEY_NAME, INT64,
+ "Max lifetime period for token in milli seconds. if value is -1, then max lifetime" +
+ " will default to a server side config value."));
+
+ private final List<KafkaPrincipal> renewers;
+ private final long maxLifeTime;
+
+ private CreateDelegationTokenRequest(short version, List<KafkaPrincipal> renewers, long maxLifeTime) {
+ super(version);
+ this.maxLifeTime = maxLifeTime;
+ this.renewers = renewers;
+ }
+
+ public CreateDelegationTokenRequest(Struct struct, short version) {
+ super(version);
+ maxLifeTime = struct.getLong(MAX_LIFE_TIME_KEY_NAME);
+ Object[] renewerArray = struct.getArray(RENEWERS_KEY_NAME);
+ renewers = new ArrayList<>();
+ if (renewerArray != null) {
+ for (Object renewerObj : renewerArray) {
+ Struct renewerObjStruct = (Struct) renewerObj;
+ String principalType = renewerObjStruct.get(PRINCIPAL_TYPE);
+ String principalName = renewerObjStruct.get(PRINCIPAL_NAME);
+ renewers.add(new KafkaPrincipal(principalType, principalName));
+ }
+ }
+ }
+
+ public static CreateDelegationTokenRequest parse(ByteBuffer buffer, short version) {
+ return new CreateDelegationTokenRequest(ApiKeys.CREATE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
+ }
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{TOKEN_CREATE_REQUEST_V0};
+ }
+
+ @Override
+ protected Struct toStruct() {
+ short version = version();
+ Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.requestSchema(version));
+ Object[] renewersArray = new Object[renewers.size()];
+
+ int i = 0;
+ for (KafkaPrincipal principal: renewers) {
+ Struct renewerStruct = struct.instance(RENEWERS_KEY_NAME);
+ renewerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType());
+ renewerStruct.set(PRINCIPAL_NAME, principal.getName());
+ renewersArray[i++] = renewerStruct;
+ }
+
+ struct.set(RENEWERS_KEY_NAME, renewersArray);
+ struct.set(MAX_LIFE_TIME_KEY_NAME, maxLifeTime);
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new CreateDelegationTokenResponse(throttleTimeMs, Errors.forException(e), KafkaPrincipal.ANONYMOUS);
+ }
+
+ public List<KafkaPrincipal> renewers() {
+ return renewers;
+ }
+
+ public long maxLifeTime() {
+ return maxLifeTime;
+ }
+
+ public static class Builder extends AbstractRequest.Builder<CreateDelegationTokenRequest> {
+ private final List<KafkaPrincipal> renewers;
+ private final long maxLifeTime;
+
+ public Builder(List<KafkaPrincipal> renewers, long maxLifeTime) {
+ super(ApiKeys.CREATE_DELEGATION_TOKEN);
+ this.renewers = renewers;
+ this.maxLifeTime = maxLifeTime;
+ }
+
+ @Override
+ public CreateDelegationTokenRequest build(short version) {
+ return new CreateDelegationTokenRequest(version, renewers, maxLifeTime);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(type: CreateDelegationTokenRequest").
+ append(", renewers=").append(renewers).
+ append(", maxLifeTime=").append(maxLifeTime).
+ append(")");
+ return bld.toString();
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
new file mode 100644
index 0000000..c718cf0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
+public class CreateDelegationTokenResponse extends AbstractResponse {
+
+ private static final String OWNER_KEY_NAME = "owner";
+ private static final String ISSUE_TIMESTAMP_KEY_NAME = "issue_timestamp";
+ private static final String EXPIRY_TIMESTAMP_NAME = "expiry_timestamp";
+ private static final String MAX_TIMESTAMP_NAME = "max_timestamp";
+ private static final String TOKEN_ID_KEY_NAME = "token_id";
+ private static final String HMAC_KEY_NAME = "hmac";
+
+ private final Errors error;
+ private final long issueTimestamp;
+ private final long expiryTimestamp;
+ private final long maxTimestamp;
+ private final String tokenId;
+ private final ByteBuffer hmac;
+ private final int throttleTimeMs;
+ private KafkaPrincipal owner;
+
+ private static final Schema TOKEN_CREATE_RESPONSE_V0 = new Schema(
+ ERROR_CODE,
+ new Field(OWNER_KEY_NAME, new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME), "token owner."),
+ new Field(ISSUE_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) when this token was generated."),
+ new Field(EXPIRY_TIMESTAMP_NAME, INT64, "timestamp (in msec) at which this token expires."),
+ new Field(MAX_TIMESTAMP_NAME, INT64, "max life time of this token."),
+ new Field(TOKEN_ID_KEY_NAME, STRING, "UUID to ensure uniqueness."),
+ new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token."),
+ THROTTLE_TIME_MS);
+
+ public CreateDelegationTokenResponse(int throttleTimeMs,
+ Errors error,
+ KafkaPrincipal owner,
+ long issueTimestamp,
+ long expiryTimestamp,
+ long maxTimestamp,
+ String tokenId,
+ ByteBuffer hmac) {
+ this.throttleTimeMs = throttleTimeMs;
+ this.error = error;
+ this.owner = owner;
+ this.issueTimestamp = issueTimestamp;
+ this.expiryTimestamp = expiryTimestamp;
+ this.maxTimestamp = maxTimestamp;
+ this.tokenId = tokenId;
+ this.hmac = hmac;
+ }
+
+ public CreateDelegationTokenResponse(int throttleTimeMs, Errors error, KafkaPrincipal owner) {
+ this(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
+ }
+
+ public CreateDelegationTokenResponse(Struct struct) {
+ error = Errors.forCode(struct.get(ERROR_CODE));
+ Struct ownerStruct = (Struct) struct.get(OWNER_KEY_NAME);
+ String principalType = ownerStruct.get(PRINCIPAL_TYPE);
+ String principalName = ownerStruct.get(PRINCIPAL_NAME);
+ owner = new KafkaPrincipal(principalType, principalName);
+ issueTimestamp = struct.getLong(ISSUE_TIMESTAMP_KEY_NAME);
+ expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_NAME);
+ maxTimestamp = struct.getLong(MAX_TIMESTAMP_NAME);
+ tokenId = struct.getString(TOKEN_ID_KEY_NAME);
+ hmac = struct.getBytes(HMAC_KEY_NAME);
+ this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ }
+
+ public static CreateDelegationTokenResponse parse(ByteBuffer buffer, short version) {
+ return new CreateDelegationTokenResponse(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
+ }
+
+ public static Schema[] schemaVersions() {
+ return new Schema[] {TOKEN_CREATE_RESPONSE_V0};
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version));
+ struct.set(ERROR_CODE, error.code());
+ Struct ownerStruct = struct.instance(OWNER_KEY_NAME);
+ ownerStruct.set(PRINCIPAL_TYPE, owner.getPrincipalType());
+ ownerStruct.set(PRINCIPAL_NAME, owner.getName());
+ struct.set(OWNER_KEY_NAME, ownerStruct);
+ struct.set(ISSUE_TIMESTAMP_KEY_NAME, issueTimestamp);
+ struct.set(EXPIRY_TIMESTAMP_NAME, expiryTimestamp);
+ struct.set(MAX_TIMESTAMP_NAME, maxTimestamp);
+ struct.set(TOKEN_ID_KEY_NAME, tokenId);
+ struct.set(HMAC_KEY_NAME, hmac);
+ struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+ return struct;
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ public KafkaPrincipal owner() {
+ return owner;
+ }
+
+ public long issueTimestamp() {
+ return issueTimestamp;
+ }
+
+ public long expiryTimestamp() {
+ return expiryTimestamp;
+ }
+
+ public long maxTimestamp() {
+ return maxTimestamp;
+ }
+
+ public String tokenId() {
+ return tokenId;
+ }
+
+ public byte[] hmacBytes() {
+ byte[] byteArray = new byte[hmac.remaining()];
+ hmac.get(byteArray);
+ return byteArray;
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
+ public boolean hasError() {
+ return this.error != Errors.NONE;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java
new file mode 100644
index 0000000..61c0055
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenRequest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
+
+public class DescribeDelegationTokenRequest extends AbstractRequest {
+ private static final String OWNER_KEY_NAME = "owners";
+
+ private final List<KafkaPrincipal> owners;
+
+ public static final Schema TOKEN_DESCRIBE_REQUEST_V0 = new Schema(
+ new Field(OWNER_KEY_NAME, ArrayOf.nullable(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)), "An array of token owners."));
+
+ public static class Builder extends AbstractRequest.Builder<DescribeDelegationTokenRequest> {
+ // describe token for the given list of owners, or null if we want to describe all tokens.
+ private final List<KafkaPrincipal> owners;
+
+ public Builder(List<KafkaPrincipal> owners) {
+ super(ApiKeys.DESCRIBE_DELEGATION_TOKEN);
+ this.owners = owners;
+ }
+
+ @Override
+ public DescribeDelegationTokenRequest build(short version) {
+ return new DescribeDelegationTokenRequest(version, owners);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(type: DescribeDelegationTokenRequest").
+ append(", owners=").append(owners).
+ append(")");
+ return bld.toString();
+ }
+ }
+
+ private DescribeDelegationTokenRequest(short version, List<KafkaPrincipal> owners) {
+ super(version);
+ this.owners = owners;
+ }
+
+ public DescribeDelegationTokenRequest(Struct struct, short versionId) {
+ super(versionId);
+
+ Object[] ownerArray = struct.getArray(OWNER_KEY_NAME);
+
+ if (ownerArray != null) {
+ owners = new ArrayList<>();
+ for (Object ownerObj : ownerArray) {
+ Struct ownerObjStruct = (Struct) ownerObj;
+ String principalType = ownerObjStruct.get(PRINCIPAL_TYPE);
+ String principalName = ownerObjStruct.get(PRINCIPAL_NAME);
+ owners.add(new KafkaPrincipal(principalType, principalName));
+ }
+ } else
+ owners = null;
+ }
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{TOKEN_DESCRIBE_REQUEST_V0};
+ }
+
+ @Override
+ protected Struct toStruct() {
+ short version = version();
+ Struct struct = new Struct(ApiKeys.DESCRIBE_DELEGATION_TOKEN.requestSchema(version));
+
+ if (owners == null) {
+ struct.set(OWNER_KEY_NAME, null);
+ } else {
+ Object[] ownersArray = new Object[owners.size()];
+
+ int i = 0;
+ for (KafkaPrincipal principal: owners) {
+ Struct ownerStruct = struct.instance(OWNER_KEY_NAME);
+ ownerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType());
+ ownerStruct.set(PRINCIPAL_NAME, principal.getName());
+ ownersArray[i++] = ownerStruct;
+ }
+
+ struct.set(OWNER_KEY_NAME, ownersArray);
+ }
+
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new DescribeDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
+ }
+
+ public List<KafkaPrincipal> owners() {
+ return owners;
+ }
+
+ public boolean ownersListEmpty() {
+ return owners != null && owners.isEmpty();
+ }
+
+ public static DescribeDelegationTokenRequest parse(ByteBuffer buffer, short version) {
+ return new DescribeDelegationTokenRequest(ApiKeys.DESCRIBE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
new file mode 100644
index 0000000..dba29ea
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
+public class DescribeDelegationTokenResponse extends AbstractResponse {
+
+ private static final String TOKEN_DETAILS_KEY_NAME = "token_details";
+ private static final String ISSUE_TIMESTAMP_KEY_NAME = "issue_timestamp";
+ private static final String EXPIRY_TIMESTAMP_NAME = "expiry_timestamp";
+ private static final String MAX_TIMESTAMP_NAME = "max_timestamp";
+ private static final String TOKEN_ID_KEY_NAME = "token_id";
+ private static final String HMAC_KEY_NAME = "hmac";
+ private static final String OWNER_KEY_NAME = "owner";
+ private static final String RENEWERS_KEY_NAME = "renewers";
+
+ private final Errors error;
+ private final List<DelegationToken> tokens;
+ private final int throttleTimeMs;
+
+ private static final Schema TOKEN_DETAILS_V0 = new Schema(
+ new Field(OWNER_KEY_NAME, new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME), "token owner."),
+ new Field(ISSUE_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) when this token was generated."),
+ new Field(EXPIRY_TIMESTAMP_NAME, INT64, "timestamp (in msec) at which this token expires."),
+ new Field(MAX_TIMESTAMP_NAME, INT64, "max life time of this token."),
+ new Field(TOKEN_ID_KEY_NAME, STRING, "UUID to ensure uniqueness."),
+ new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."),
+ new Field(RENEWERS_KEY_NAME, new ArrayOf(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)),
+ "An array of token renewers. Renewer is an Kafka PrincipalType and name string," +
+ " who is allowed to renew this token before the max lifetime expires."));
+
+ private static final Schema TOKEN_DESCRIBE_RESPONSE_V0 = new Schema(
+ ERROR_CODE,
+ new Field(TOKEN_DETAILS_KEY_NAME, new ArrayOf(TOKEN_DETAILS_V0)),
+ THROTTLE_TIME_MS);
+
+ public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error, List<DelegationToken> tokens) {
+ this.throttleTimeMs = throttleTimeMs;
+ this.error = error;
+ this.tokens = tokens;
+ }
+
+ public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error) {
+ this(throttleTimeMs, error, new ArrayList<DelegationToken>());
+ }
+
+ public DescribeDelegationTokenResponse(Struct struct) {
+ Object[] requestStructs = struct.getArray(TOKEN_DETAILS_KEY_NAME);
+ List<DelegationToken> tokens = new ArrayList<>();
+
+ for (Object requestStructObj : requestStructs) {
+ Struct singleRequestStruct = (Struct) requestStructObj;
+
+ Struct ownerStruct = (Struct) singleRequestStruct.get(OWNER_KEY_NAME);
+ KafkaPrincipal owner = new KafkaPrincipal(ownerStruct.get(PRINCIPAL_TYPE), ownerStruct.get(PRINCIPAL_NAME));
+ long issueTimestamp = singleRequestStruct.getLong(ISSUE_TIMESTAMP_KEY_NAME);
+ long expiryTimestamp = singleRequestStruct.getLong(EXPIRY_TIMESTAMP_NAME);
+ long maxTimestamp = singleRequestStruct.getLong(MAX_TIMESTAMP_NAME);
+ String tokenId = singleRequestStruct.getString(TOKEN_ID_KEY_NAME);
+ ByteBuffer hmac = singleRequestStruct.getBytes(HMAC_KEY_NAME);
+
+ Object[] renewerArray = singleRequestStruct.getArray(RENEWERS_KEY_NAME);
+ List<KafkaPrincipal> renewers = new ArrayList<>();
+ if (renewerArray != null) {
+ for (Object renewerObj : renewerArray) {
+ Struct renewerObjStruct = (Struct) renewerObj;
+ String principalType = renewerObjStruct.get(PRINCIPAL_TYPE);
+ String principalName = renewerObjStruct.get(PRINCIPAL_NAME);
+ renewers.add(new KafkaPrincipal(principalType, principalName));
+ }
+ }
+
+ TokenInformation tokenInfo = new TokenInformation(tokenId, owner, renewers, issueTimestamp, maxTimestamp, expiryTimestamp);
+
+ byte[] hmacBytes = new byte[hmac.remaining()];
+ hmac.get(hmacBytes);
+
+ DelegationToken tokenDetails = new DelegationToken(tokenInfo, hmacBytes);
+ tokens.add(tokenDetails);
+ }
+
+ this.tokens = tokens;
+ error = Errors.forCode(struct.get(ERROR_CODE));
+ this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ }
+
+ public static DescribeDelegationTokenResponse parse(ByteBuffer buffer, short version) {
+ return new DescribeDelegationTokenResponse(ApiKeys.DESCRIBE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.DESCRIBE_DELEGATION_TOKEN.responseSchema(version));
+ List<Struct> tokenDetailsStructs = new ArrayList<>(tokens.size());
+
+ struct.set(ERROR_CODE, error.code());
+
+ for (DelegationToken token : tokens) {
+ TokenInformation tokenInfo = token.tokenInfo();
+ Struct singleRequestStruct = struct.instance(TOKEN_DETAILS_KEY_NAME);
+ Struct ownerStruct = singleRequestStruct.instance(OWNER_KEY_NAME);
+ ownerStruct.set(PRINCIPAL_TYPE, tokenInfo.owner().getPrincipalType());
+ ownerStruct.set(PRINCIPAL_NAME, tokenInfo.owner().getName());
+ singleRequestStruct.set(OWNER_KEY_NAME, ownerStruct);
+ singleRequestStruct.set(ISSUE_TIMESTAMP_KEY_NAME, tokenInfo.issueTimestamp());
+ singleRequestStruct.set(EXPIRY_TIMESTAMP_NAME, tokenInfo.expiryTimestamp());
+ singleRequestStruct.set(MAX_TIMESTAMP_NAME, tokenInfo.maxTimestamp());
+ singleRequestStruct.set(TOKEN_ID_KEY_NAME, tokenInfo.tokenId());
+ singleRequestStruct.set(HMAC_KEY_NAME, ByteBuffer.wrap(token.hmac()));
+
+ Object[] renewersArray = new Object[tokenInfo.renewers().size()];
+
+ int i = 0;
+ for (KafkaPrincipal principal: tokenInfo.renewers()) {
+ Struct renewerStruct = singleRequestStruct.instance(RENEWERS_KEY_NAME);
+ renewerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType());
+ renewerStruct.set(PRINCIPAL_NAME, principal.getName());
+ renewersArray[i++] = renewerStruct;
+ }
+
+ singleRequestStruct.set(RENEWERS_KEY_NAME, renewersArray);
+
+ tokenDetailsStructs.add(singleRequestStruct);
+ }
+ struct.set(TOKEN_DETAILS_KEY_NAME, tokenDetailsStructs.toArray());
+ struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+
+ return struct;
+ }
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{TOKEN_DESCRIBE_RESPONSE_V0};
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ public List<DelegationToken> tokens() {
+ return tokens;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
new file mode 100644
index 0000000..0d43440
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
+public class ExpireDelegationTokenRequest extends AbstractRequest {
+
+ private static final String HMAC_KEY_NAME = "hmac";
+ private static final String EXPIRY_TIME_PERIOD_KEY_NAME = "expiry_time_period";
+ private final ByteBuffer hmac;
+ private final long expiryTimePeriod;
+
+ private static final Schema TOKEN_EXPIRE_REQUEST_V0 = new Schema(
+ new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."),
+ new Field(EXPIRY_TIME_PERIOD_KEY_NAME, INT64, "expiry time period in milli seconds."));
+
+ private ExpireDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
+ super(version);
+
+ this.hmac = hmac;
+ this.expiryTimePeriod = renewTimePeriod;
+ }
+
+ public ExpireDelegationTokenRequest(Struct struct, short versionId) {
+ super(versionId);
+
+ hmac = struct.getBytes(HMAC_KEY_NAME);
+ expiryTimePeriod = struct.getLong(EXPIRY_TIME_PERIOD_KEY_NAME);
+ }
+
+ public static ExpireDelegationTokenRequest parse(ByteBuffer buffer, short version) {
+ return new ExpireDelegationTokenRequest(ApiKeys.EXPIRE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
+ }
+
+ public static Schema[] schemaVersions() {
+ return new Schema[] {TOKEN_EXPIRE_REQUEST_V0};
+ }
+
+ @Override
+ protected Struct toStruct() {
+ short version = version();
+ Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.requestSchema(version));
+
+ struct.set(HMAC_KEY_NAME, hmac);
+ struct.set(EXPIRY_TIME_PERIOD_KEY_NAME, expiryTimePeriod);
+
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new ExpireDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
+ }
+
+ public ByteBuffer hmac() {
+ return hmac;
+ }
+
+ public long expiryTimePeriod() {
+ return expiryTimePeriod;
+ }
+
+ public static class Builder extends AbstractRequest.Builder<ExpireDelegationTokenRequest> {
+ private final ByteBuffer hmac;
+ private final long expiryTimePeriod;
+
+ public Builder(ByteBuffer hmac, long expiryTimePeriod) {
+ super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
+ this.hmac = hmac;
+ this.expiryTimePeriod = expiryTimePeriod;
+ }
+
+ @Override
+ public ExpireDelegationTokenRequest build(short version) {
+ return new ExpireDelegationTokenRequest(version, hmac, expiryTimePeriod);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(type: ExpireDelegationTokenRequest").
+ append(", hmac=").append(hmac).
+ append(", expiryTimePeriod=").append(expiryTimePeriod).
+ append(")");
+ return bld.toString();
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
new file mode 100644
index 0000000..f7e0ec4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
+public class ExpireDelegationTokenResponse extends AbstractResponse {
+
+ private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp";
+
+ private final Errors error;
+ private final long expiryTimestamp;
+ private final int throttleTimeMs;
+
+ private static final Schema TOKEN_EXPIRE_RESPONSE_V0 = new Schema(
+ ERROR_CODE,
+ new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
+ THROTTLE_TIME_MS);
+
+ public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
+ this.throttleTimeMs = throttleTimeMs;
+ this.error = error;
+ this.expiryTimestamp = expiryTimestamp;
+ }
+
+ public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error) {
+ this(throttleTimeMs, error, -1);
+ }
+
+ public ExpireDelegationTokenResponse(Struct struct) {
+ error = Errors.forCode(struct.get(ERROR_CODE));
+ this.expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME);
+ this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ }
+
+ public static ExpireDelegationTokenResponse parse(ByteBuffer buffer, short version) {
+ return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
+ }
+
+ public static Schema[] schemaVersions() {
+ return new Schema[] {TOKEN_EXPIRE_RESPONSE_V0};
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ public long expiryTimestamp() {
+ return expiryTimestamp;
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version));
+
+ struct.set(ERROR_CODE, error.code());
+ struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp);
+ struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+
+ return struct;
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
new file mode 100644
index 0000000..4a4b762
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
+public class RenewDelegationTokenRequest extends AbstractRequest {
+
+ private static final String HMAC_KEY_NAME = "hmac";
+ private static final String RENEW_TIME_PERIOD_KEY_NAME = "renew_time_period";
+ private final ByteBuffer hmac;
+ private final long renewTimePeriod;
+
+ public static final Schema TOKEN_RENEW_REQUEST_V0 = new Schema(
+ new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be renewed."),
+ new Field(RENEW_TIME_PERIOD_KEY_NAME, INT64, "Renew time period in milli seconds."));
+
+ private RenewDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
+ super(version);
+
+ this.hmac = hmac;
+ this.renewTimePeriod = renewTimePeriod;
+ }
+
+ public RenewDelegationTokenRequest(Struct struct, short versionId) {
+ super(versionId);
+
+ hmac = struct.getBytes(HMAC_KEY_NAME);
+ renewTimePeriod = struct.getLong(RENEW_TIME_PERIOD_KEY_NAME);
+ }
+
+ public static RenewDelegationTokenRequest parse(ByteBuffer buffer, short version) {
+ return new RenewDelegationTokenRequest(ApiKeys.RENEW_DELEGATION_TOKEN.parseRequest(version, buffer), version);
+ }
+
+ public static Schema[] schemaVersions() {
+ return new Schema[] {TOKEN_RENEW_REQUEST_V0};
+ }
+
+ @Override
+ protected Struct toStruct() {
+ short version = version();
+ Struct struct = new Struct(ApiKeys.RENEW_DELEGATION_TOKEN.requestSchema(version));
+
+ struct.set(HMAC_KEY_NAME, hmac);
+ struct.set(RENEW_TIME_PERIOD_KEY_NAME, renewTimePeriod);
+
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new RenewDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
+ }
+
+ public ByteBuffer hmac() {
+ return hmac;
+ }
+
+ public long renewTimePeriod() {
+ return renewTimePeriod;
+ }
+
+ public static class Builder extends AbstractRequest.Builder<RenewDelegationTokenRequest> {
+ private final ByteBuffer hmac;
+ private final long renewTimePeriod;
+
+ public Builder(ByteBuffer hmac, long renewTimePeriod) {
+ super(ApiKeys.RENEW_DELEGATION_TOKEN);
+ this.hmac = hmac;
+ this.renewTimePeriod = renewTimePeriod;
+ }
+
+ @Override
+ public RenewDelegationTokenRequest build(short version) {
+ return new RenewDelegationTokenRequest(version, hmac, renewTimePeriod);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(type: RenewDelegationTokenRequest").
+ append(", hmac=").append(hmac).
+ append(", renewTimePeriod=").append(renewTimePeriod).
+ append(")");
+ return bld.toString();
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
new file mode 100644
index 0000000..1885b48
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
+public class RenewDelegationTokenResponse extends AbstractResponse {
+
+ private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp";
+
+ private final Errors error;
+ private final long expiryTimestamp;
+ private final int throttleTimeMs;
+
+ private static final Schema TOKEN_RENEW_RESPONSE_V0 = new Schema(
+ ERROR_CODE,
+ new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
+ THROTTLE_TIME_MS);
+
+ public RenewDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
+ this.throttleTimeMs = throttleTimeMs;
+ this.error = error;
+ this.expiryTimestamp = expiryTimestamp;
+ }
+
+ public RenewDelegationTokenResponse(int throttleTimeMs, Errors error) {
+ this(throttleTimeMs, error, -1);
+ }
+
+ public RenewDelegationTokenResponse(Struct struct) {
+ error = Errors.forCode(struct.get(ERROR_CODE));
+ expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME);
+ this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ }
+
+ public static RenewDelegationTokenResponse parse(ByteBuffer buffer, short version) {
+ return new RenewDelegationTokenResponse(ApiKeys.RENEW_DELEGATION_TOKEN.responseSchema(version).read(buffer));
+ }
+
+ public static Schema[] schemaVersions() {
+ return new Schema[] {TOKEN_RENEW_RESPONSE_V0};
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.RENEW_DELEGATION_TOKEN.responseSchema(version));
+
+ struct.set(ERROR_CODE, error.code());
+ struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp);
+ struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+
+ return struct;
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ public long expiryTimestamp() {
+ return expiryTimestamp;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
index d83382d..2ce653f 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
@@ -58,7 +58,12 @@ public enum ResourceType {
/**
* A transactional ID.
*/
- TRANSACTIONAL_ID((byte) 5);
+ TRANSACTIONAL_ID((byte) 5),
+
+ /**
+ * A token ID.
+ */
+ DELEGATION_TOKEN((byte) 6);
private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index ed3c956..10bf76d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -47,10 +47,16 @@ public class KafkaPrincipal implements Principal {
private final String principalType;
private final String name;
+ private boolean tokenAuthenticated;
public KafkaPrincipal(String principalType, String name) {
+ this(principalType, name, false);
+ }
+
+ public KafkaPrincipal(String principalType, String name, boolean tokenauth) {
this.principalType = requireNonNull(principalType, "Principal type cannot be null");
this.name = requireNonNull(name, "Principal name cannot be null");
+ this.tokenAuthenticated = tokenauth;
}
/**
@@ -83,8 +89,9 @@ public class KafkaPrincipal implements Principal {
@Override
public int hashCode() {
- int result = principalType.hashCode();
- result = 31 * result + name.hashCode();
+ int result = principalType != null ? principalType.hashCode() : 0;
+ result = 31 * result + (name != null ? name.hashCode() : 0);
+ result = 31 * result + (tokenAuthenticated ? 1 : 0);
return result;
}
@@ -97,4 +104,8 @@ public class KafkaPrincipal implements Principal {
return principalType;
}
+ public boolean tokenAuthenticated() {
+ return tokenAuthenticated;
+ }
}
+
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
index 30b0a3e..7404238 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
@@ -118,8 +119,13 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
SaslServer saslServer = ((SaslAuthenticationContext) context).server();
if (SaslConfigs.GSSAPI_MECHANISM.equals(saslServer.getMechanismName()))
return applyKerberosShortNamer(saslServer.getAuthorizationID());
- else
- return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
+ else {
+ Boolean isTokenAuthenticated = (Boolean) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG);
+ if (isTokenAuthenticated != null && isTokenAuthenticated)
+ return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(), true);
+ else
+ return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
+ }
} else {
throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 4756387..de96cef 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -28,6 +28,7 @@ import javax.security.sasl.RealmCallback;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.scram.DelegationTokenAuthenticationCallback;
/**
* Callback handler for Sasl clients. The callbacks required for the SASL mechanism
@@ -80,7 +81,12 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
ac.setAuthorized(authId.equals(authzId));
if (ac.isAuthorized())
ac.setAuthorizedID(authzId);
- } else {
+ } else if (callback instanceof DelegationTokenAuthenticationCallback) {
+ DelegationTokenAuthenticationCallback tc = (DelegationTokenAuthenticationCallback) callback;
+ if (!isKerberos && subject != null && !subject.getPublicCredentials(Boolean.class).isEmpty()) {
+ tc.tokenauth(subject.getPublicCredentials(Boolean.class).iterator().next());
+ }
+ } else {
throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 1a2aaf4..6fcca57 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -56,6 +56,7 @@ import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.apache.kafka.common.security.scram.ScramServerCallbackHandler;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
@@ -107,6 +108,7 @@ public class SaslServerAuthenticator implements Authenticator {
private final Set<String> enabledMechanisms;
private final Map<String, ?> configs;
private final KafkaPrincipalBuilder principalBuilder;
+ private final DelegationTokenCache tokenCache;
// Current SASL state
private SaslState saslState = SaslState.INITIAL_REQUEST;
@@ -132,7 +134,8 @@ public class SaslServerAuthenticator implements Authenticator {
CredentialCache credentialCache,
ListenerName listenerName,
SecurityProtocol securityProtocol,
- TransportLayer transportLayer) throws IOException {
+ TransportLayer transportLayer,
+ DelegationTokenCache tokenCache) throws IOException {
if (subject == null)
throw new IllegalArgumentException("subject cannot be null");
this.connectionId = connectionId;
@@ -142,7 +145,7 @@ public class SaslServerAuthenticator implements Authenticator {
this.listenerName = listenerName;
this.securityProtocol = securityProtocol;
this.enableKafkaSaslAuthenticateHeaders = false;
-
+ this.tokenCache = tokenCache;
this.transportLayer = transportLayer;
this.configs = configs;
@@ -162,7 +165,7 @@ public class SaslServerAuthenticator implements Authenticator {
if (!ScramMechanism.isScram(mechanism))
callbackHandler = new SaslServerCallbackHandler(jaasContext);
else
- callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class));
+ callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class), tokenCache);
callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);
if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
saslServer = createSaslKerberosServer(callbackHandler, configs, subject);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
similarity index 74%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
index 931210a..df6e849 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
@@ -14,18 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.common.security.scram;
import javax.security.auth.callback.Callback;
-public class ScramCredentialCallback implements Callback {
- private ScramCredential scramCredential;
+public class DelegationTokenAuthenticationCallback implements Callback {
+ private boolean tokenauth;
- public ScramCredential scramCredential() {
- return scramCredential;
+ public String extension() {
+ return ScramLoginModule.TOKEN_AUTH_CONFIG + "=" + Boolean.toString(tokenauth);
}
- public void scramCredential(ScramCredential scramCredential) {
- this.scramCredential = scramCredential;
+ public void tokenauth(Boolean tokenauth) {
+ this.tokenauth = tokenauth;
}
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
index 931210a..7f3601c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
@@ -20,12 +20,36 @@ import javax.security.auth.callback.Callback;
public class ScramCredentialCallback implements Callback {
private ScramCredential scramCredential;
+ private boolean tokenAuthenticated;
+ private String tokenOwner;
+ private String mechanism;
+
+ public ScramCredentialCallback(boolean tokenAuthenticated, String mechanism) {
+ this.tokenAuthenticated = tokenAuthenticated;
+ this.mechanism = mechanism;
+ }
public ScramCredential scramCredential() {
return scramCredential;
}
+ public boolean tokenauth() {
+ return tokenAuthenticated;
+ }
+
public void scramCredential(ScramCredential scramCredential) {
this.scramCredential = scramCredential;
}
+
+ public void tokenOwner(String tokenOwner) {
+ this.tokenOwner = tokenOwner;
+ }
+
+ public String tokenOwner() {
+ return tokenOwner;
+ }
+
+ public String mechanism() {
+ return mechanism;
+ }
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
index c087a32..8000f4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -27,6 +27,7 @@ public class ScramLoginModule implements LoginModule {
private static final String USERNAME_CONFIG = "username";
private static final String PASSWORD_CONFIG = "password";
+ public static final String TOKEN_AUTH_CONFIG = "tokenauth";
static {
ScramSaslClientProvider.initialize();
@@ -41,6 +42,9 @@ public class ScramLoginModule implements LoginModule {
String password = (String) options.get(PASSWORD_CONFIG);
if (password != null)
subject.getPrivateCredentials().add(password);
+
+ Boolean useTokenAuthentication = "true".equalsIgnoreCase((String) options.get(TOKEN_AUTH_CONFIG));
+ subject.getPublicCredentials().add(useTokenAuthentication);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
index 1ad7266..e697ea5 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.security.scram;
import org.apache.kafka.common.utils.Base64;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -64,16 +66,18 @@ public class ScramMessages {
*/
public static class ClientFirstMessage extends AbstractScramMessage {
private static final Pattern PATTERN = Pattern.compile(String.format(
- "n,(a=(?<authzid>%s))?,%sn=(?<saslname>%s),r=(?<nonce>%s)%s",
+ "n,(a=(?<authzid>%s))?,%sn=(?<saslname>%s),r=(?<nonce>%s)(?<extensions>%s)",
SASLNAME,
RESERVED,
SASLNAME,
PRINTABLE,
EXTENSIONS));
+
private final String saslName;
private final String nonce;
private final String authorizationId;
+ private final String extensions;
public ClientFirstMessage(byte[] messageBytes) throws SaslException {
String message = toMessage(messageBytes);
Matcher matcher = PATTERN.matcher(message);
@@ -83,10 +87,13 @@ public class ScramMessages {
this.authorizationId = authzid != null ? authzid : "";
this.saslName = matcher.group("saslname");
this.nonce = matcher.group("nonce");
+ String extString = matcher.group("extensions");
+ this.extensions = extString.startsWith(",") ? extString.substring(1) : extString;
}
- public ClientFirstMessage(String saslName, String nonce) {
+ public ClientFirstMessage(String saslName, String nonce, String extensions) {
this.saslName = saslName;
this.nonce = nonce;
+ this.extensions = extensions;
this.authorizationId = ""; // Optional authzid not specified in gs2-header
}
public String saslName() {
@@ -101,8 +108,29 @@ public class ScramMessages {
public String gs2Header() {
return "n," + authorizationId + ",";
}
+ public String extensions() {
+ return extensions;
+ }
+
+ public Map<String, String> extensionsAsMap() {
+ Map<String, String> map = new HashMap<>();
+
+ if (extensions.isEmpty())
+ return map;
+
+ String[] attrvals = extensions.split(",");
+ for (String attrval: attrvals) {
+ String[] array = attrval.split("=", 2);
+ map.put(array[0], array[1]);
+ }
+ return map;
+ }
+
public String clientFirstMessageBare() {
- return String.format("n=%s,r=%s", saslName, nonce);
+ if (extensions.isEmpty())
+ return String.format("n=%s,r=%s", saslName, nonce);
+ else
+ return String.format("n=%s,r=%s,%s", saslName, nonce, extensions);
}
String toMessage() {
return gs2Header() + clientFirstMessageBare();
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
index e5b0f84..6b66f5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
@@ -97,14 +97,18 @@ public class ScramSaslClient implements SaslClient {
throw new SaslException("Expected empty challenge");
clientNonce = formatter.secureRandomString();
NameCallback nameCallback = new NameCallback("Name:");
+ DelegationTokenAuthenticationCallback tokenAuthCallback = new DelegationTokenAuthenticationCallback();
+
try {
- callbackHandler.handle(new Callback[]{nameCallback});
+ callbackHandler.handle(new Callback[]{nameCallback, tokenAuthCallback});
} catch (IOException | UnsupportedCallbackException e) {
throw new SaslException("User name could not be obtained", e);
}
+
String username = nameCallback.getName();
String saslName = formatter.saslName(username);
- this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce);
+ String extension = tokenAuthCallback.extension();
+ this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extension);
setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
return clientFirstMessage.toBytes();
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
index b250e15..94b92b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
@@ -66,6 +66,8 @@ public class ScramSaslServer implements SaslServer {
private ClientFirstMessage clientFirstMessage;
private ServerFirstMessage serverFirstMessage;
private ScramCredential scramCredential;
+ private boolean tokenAuthentication;
+ private String tokenOwner;
public ScramSaslServer(ScramMechanism mechanism, Map<String, ?> props, CallbackHandler callbackHandler) throws NoSuchAlgorithmException {
this.mechanism = mechanism;
@@ -93,9 +95,14 @@ public class ScramSaslServer implements SaslServer {
try {
String saslName = clientFirstMessage.saslName();
this.username = formatter.username(saslName);
+ Map<String, String> extensions = clientFirstMessage.extensionsAsMap();
+ this.tokenAuthentication = "true".equalsIgnoreCase(extensions.get(ScramLoginModule.TOKEN_AUTH_CONFIG));
NameCallback nameCallback = new NameCallback("username", username);
- ScramCredentialCallback credentialCallback = new ScramCredentialCallback();
+ ScramCredentialCallback credentialCallback = new ScramCredentialCallback(tokenAuthentication, getMechanismName());
callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
+ this.tokenOwner = credentialCallback.tokenOwner();
+ if (tokenAuthentication && tokenOwner == null)
+ throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
this.scramCredential = credentialCallback.scramCredential();
if (scramCredential == null)
throw new SaslException("Authentication failed: Invalid user credentials");
@@ -143,6 +150,10 @@ public class ScramSaslServer implements SaslServer {
public String getAuthorizationID() {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
+
+ if (tokenAuthentication)
+ return tokenOwner; // return token owner as principal for this session
+
return username;
}
@@ -155,7 +166,11 @@ public class ScramSaslServer implements SaslServer {
public Object getNegotiatedProperty(String propName) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
- return null;
+
+ if (ScramLoginModule.TOKEN_AUTH_CONFIG.equals(propName))
+ return tokenAuthentication;
+ else
+ return null;
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
index d3b245d..a064e8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
@@ -27,13 +27,17 @@ import javax.security.auth.callback.UnsupportedCallbackException;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.authenticator.AuthCallbackHandler;
import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
public class ScramServerCallbackHandler implements AuthCallbackHandler {
private final CredentialCache.Cache<ScramCredential> credentialCache;
+ private final DelegationTokenCache tokenCache;
- public ScramServerCallbackHandler(CredentialCache.Cache<ScramCredential> credentialCache) {
+ public ScramServerCallbackHandler(CredentialCache.Cache<ScramCredential> credentialCache,
+ DelegationTokenCache tokenCache) {
this.credentialCache = credentialCache;
+ this.tokenCache = tokenCache;
}
@Override
@@ -42,9 +46,14 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
for (Callback callback : callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
- else if (callback instanceof ScramCredentialCallback)
- ((ScramCredentialCallback) callback).scramCredential(credentialCache.get(username));
- else
+ else if (callback instanceof ScramCredentialCallback) {
+ ScramCredentialCallback sc = (ScramCredentialCallback) callback;
+ if (sc.tokenauth()) {
+ sc.scramCredential(tokenCache.credential(sc.mechanism(), username));
+ sc.tokenOwner(tokenCache.owner(username));
+ } else
+ sc.scramCredential(credentialCache.get(username));
+ } else
throw new UnsupportedCallbackException(callback);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
new file mode 100644
index 0000000..05ccbda
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.token.delegation;
+
+import org.apache.kafka.common.utils.Base64;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class DelegationToken {
+ private TokenInformation tokenInformation;
+ private byte[] hmac;
+
+ public DelegationToken(TokenInformation tokenInformation, byte[] hmac) {
+ this.tokenInformation = tokenInformation;
+ this.hmac = hmac;
+ }
+
+ public TokenInformation tokenInfo() {
+ return tokenInformation;
+ }
+
+ public byte[] hmac() {
+ return hmac;
+ }
+
+ public String hmacAsBase64String() {
+ return Base64.encoder().encodeToString(hmac);
+ }
+
+ public ByteBuffer hmacBuffer() {
+ return ByteBuffer.wrap(hmac);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DelegationToken token = (DelegationToken) o;
+
+ if (tokenInformation != null ? !tokenInformation.equals(token.tokenInformation) : token.tokenInformation != null) {
+ return false;
+ }
+ return Arrays.equals(hmac, token.hmac);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tokenInformation != null ? tokenInformation.hashCode() : 0;
+ result = 31 * result + Arrays.hashCode(hmac);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "DelegationToken{" +
+ "tokenInformation=" + tokenInformation +
+ ", hmac=[*******]" +
+ '}';
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
new file mode 100644
index 0000000..78575b8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.token.delegation;
+
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramCredentialUtils;
+import org.apache.kafka.common.security.scram.ScramMechanism;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DelegationTokenCache {
+
+ private CredentialCache credentialCache = new CredentialCache();
+ //Cache to hold all the tokens
+ private Map<String, TokenInformation> tokenCache = new ConcurrentHashMap<>();
+ //Cache to hold hmac->tokenId mapping. This is required for renew, expire requests
+ private Map<String, String> hmacIDCache = new ConcurrentHashMap<>();
+
+ public DelegationTokenCache(Collection<String> scramMechanisms) {
+ //Create caches for scramMechanisms
+ ScramCredentialUtils.createCache(credentialCache, scramMechanisms);
+ }
+
+ public ScramCredential credential(String mechanism, String tokenId) {
+ CredentialCache.Cache<ScramCredential> cache = credentialCache.cache(mechanism, ScramCredential.class);
+ return cache == null ? null : cache.get(tokenId);
+ }
+
+ public String owner(String tokenId) {
+ TokenInformation tokenInfo = tokenCache.get(tokenId);
+ return tokenInfo == null ? null : tokenInfo.owner().getName();
+ }
+
+ public void updateCache(DelegationToken token, Map<String, ScramCredential> scramCredentialMap) {
+ //Update TokenCache
+ String tokenId = token.tokenInfo().tokenId();
+ addToken(tokenId, token.tokenInfo());
+ String hmac = token.hmacAsBase64String();
+ //Update Scram Credentials
+ updateCredentials(tokenId, scramCredentialMap);
+ //Update hmac-id cache
+ hmacIDCache.put(hmac, tokenId);
+ }
+
+
+ public void removeCache(String tokenId) {
+ removeToken(tokenId);
+ updateCredentials(tokenId, new HashMap<String, ScramCredential>());
+ }
+
+ public TokenInformation tokenForHmac(String base64hmac) {
+ String tokenId = hmacIDCache.get(base64hmac);
+ return tokenId == null ? null : tokenCache.get(tokenId);
+ }
+
+ public TokenInformation addToken(String tokenId, TokenInformation tokenInfo) {
+ return tokenCache.put(tokenId, tokenInfo);
+ }
+
+ public void removeToken(String tokenId) {
+ TokenInformation tokenInfo = tokenCache.remove(tokenId);
+ if (tokenInfo != null) {
+ hmacIDCache.remove(tokenInfo.tokenId());
+ }
+ }
+
+ public Collection<TokenInformation> tokens() {
+ return tokenCache.values();
+ }
+
+ public TokenInformation token(String tokenId) {
+ return tokenCache.get(tokenId);
+ }
+
+ public CredentialCache.Cache<ScramCredential> credentialCache(String mechanism) {
+ return credentialCache.cache(mechanism, ScramCredential.class);
+ }
+
+ private void updateCredentials(String tokenId, Map<String, ScramCredential> scramCredentialMap) {
+ for (String mechanism : ScramMechanism.mechanismNames()) {
+ CredentialCache.Cache<ScramCredential> cache = credentialCache.cache(mechanism, ScramCredential.class);
+ if (cache != null) {
+ ScramCredential credential = scramCredentialMap.get(mechanism);
+ if (credential == null) {
+ cache.remove(tokenId);
+ } else {
+ cache.put(tokenId, credential);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
new file mode 100644
index 0000000..1d500d2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.token.delegation;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class TokenInformation {
+
+ private KafkaPrincipal owner;
+ private Collection<KafkaPrincipal> renewers;
+ private long issueTimestamp;
+ private long maxTimestamp;
+ private long expiryTimestamp;
+ private String tokenId;
+
+ public TokenInformation(String tokenId, KafkaPrincipal owner, Collection<KafkaPrincipal> renewers,
+ long issueTimestamp, long maxTimestamp, long expiryTimestamp) {
+ this.tokenId = tokenId;
+ this.owner = owner;
+ this.renewers = renewers;
+ this.issueTimestamp = issueTimestamp;
+ this.maxTimestamp = maxTimestamp;
+ this.expiryTimestamp = expiryTimestamp;
+ }
+
+ public KafkaPrincipal owner() {
+ return owner;
+ }
+
+ public String ownerAsString() {
+ return owner.toString();
+ }
+
+ public Collection<KafkaPrincipal> renewers() {
+ return renewers;
+ }
+
+ public Collection<String> renewersAsString() {
+ Collection<String> renewerList = new ArrayList<>();
+ for (KafkaPrincipal renewer : renewers) {
+ renewerList.add(renewer.toString());
+ }
+ return renewerList;
+ }
+
+ public long issueTimestamp() {
+ return issueTimestamp;
+ }
+
+ public long expiryTimestamp() {
+ return expiryTimestamp;
+ }
+
+ public void setExpiryTimestamp(long expiryTimestamp) {
+ this.expiryTimestamp = expiryTimestamp;
+ }
+
+ public String tokenId() {
+ return tokenId;
+ }
+
+ public long maxTimestamp() {
+ return maxTimestamp;
+ }
+
+ public boolean ownerOrRenewer(KafkaPrincipal principal) {
+ if (owner.equals(principal) || renewers.contains(principal))
+ return true;
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "TokenInformation{" +
+ "owner=" + owner +
+ ", renewers=" + renewers +
+ ", issueTimestamp=" + issueTimestamp +
+ ", maxTimestamp=" + maxTimestamp +
+ ", expiryTimestamp=" + expiryTimestamp +
+ ", tokenId='" + tokenId + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TokenInformation that = (TokenInformation) o;
+
+ if (issueTimestamp != that.issueTimestamp) {
+ return false;
+ }
+ if (maxTimestamp != that.maxTimestamp) {
+ return false;
+ }
+ if (owner != null ? !owner.equals(that.owner) : that.owner != null) {
+ return false;
+ }
+ if (renewers != null ? !renewers.equals(that.renewers) : that.renewers != null) {
+ return false;
+ }
+ return tokenId != null ? tokenId.equals(that.tokenId) : that.tokenId == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = owner != null ? owner.hashCode() : 0;
+ result = 31 * result + (renewers != null ? renewers.hashCode() : 0);
+ result = 31 * result + (int) (issueTimestamp ^ (issueTimestamp >>> 32));
+ result = 31 * result + (int) (maxTimestamp ^ (maxTimestamp >>> 32));
+ result = 31 * result + (tokenId != null ? tokenId.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index e27a31d..ff8929a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -44,6 +44,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+
/**
* Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
* with the configured security protocol.
@@ -63,6 +65,7 @@ public class NioEchoServer extends Thread {
private final CredentialCache credentialCache;
private final Metrics metrics;
private int numSent = 0;
+ private final DelegationTokenCache tokenCache;
public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache) throws Exception {
@@ -75,10 +78,11 @@ public class NioEchoServer extends Thread {
this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
this.credentialCache = credentialCache;
+ this.tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames());
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL)
ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
if (channelBuilder == null)
- channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
+ channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache, tokenCache);
this.metrics = new Metrics();
this.selector = new Selector(5000, metrics, new MockTime(), "MetricGroup", channelBuilder, new LogContext());
acceptorThread = new AcceptorThread();
@@ -92,6 +96,10 @@ public class NioEchoServer extends Thread {
return credentialCache;
}
+ public DelegationTokenCache tokenCache() {
+ return tokenCache;
+ }
+
@SuppressWarnings("deprecation")
public double metricValue(String name) {
for (Map.Entry<MetricName, KafkaMetric> entry : metrics.metrics().entrySet()) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 750fd01..a64cb45 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -72,7 +72,7 @@ public class SaslChannelBuilderTest {
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"),
- "PLAIN", true, null);
+ "PLAIN", true, null, null);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
index c4b3fc4..0ded0de 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -26,7 +26,8 @@ public class ProtoUtilsTest {
public void testDelayedAllocationSchemaDetection() throws Exception {
//verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.
for (ApiKeys key : ApiKeys.values()) {
- if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP || key == ApiKeys.SASL_AUTHENTICATE) {
+ if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP || key == ApiKeys.SASL_AUTHENTICATE
+ || key == ApiKeys.EXPIRE_DELEGATION_TOKEN || key == ApiKeys.RENEW_DELEGATION_TOKEN) {
assertTrue(key + " should require delayed allocation", key.requiresDelayedAllocation);
} else {
assertFalse(key + " should not require delayed allocation", key.requiresDelayedAllocation);
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index edd1314..2771f2f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -49,6 +49,10 @@ import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourceFilter;
import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.junit.Test;
@@ -62,6 +66,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -245,6 +250,18 @@ public class RequestResponseTest {
checkRequest(createCreatePartitionsRequestWithAssignments());
checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException());
checkResponse(createCreatePartitionsResponse(), 0);
+ checkRequest(createCreateTokenRequest());
+ checkErrorResponse(createCreateTokenRequest(), new UnknownServerException());
+ checkResponse(createCreateTokenResponse(), 0);
+ checkRequest(createDescribeTokenRequest());
+ checkErrorResponse(createDescribeTokenRequest(), new UnknownServerException());
+ checkResponse(createDescribeTokenResponse(), 0);
+ checkRequest(createExpireTokenRequest());
+ checkErrorResponse(createExpireTokenRequest(), new UnknownServerException());
+ checkResponse(createExpireTokenResponse(), 0);
+ checkRequest(createRenewTokenRequest());
+ checkErrorResponse(createRenewTokenRequest(), new UnknownServerException());
+ checkResponse(createRenewTokenResponse(), 0);
}
@Test
@@ -1082,4 +1099,57 @@ public class RequestResponseTest {
return new CreatePartitionsResponse(42, results);
}
+ private CreateDelegationTokenRequest createCreateTokenRequest() {
+ List<KafkaPrincipal> renewers = new ArrayList<>();
+ renewers.add(SecurityUtils.parseKafkaPrincipal("User:user1"));
+ renewers.add(SecurityUtils.parseKafkaPrincipal("User:user2"));
+ return new CreateDelegationTokenRequest.Builder(renewers, System.currentTimeMillis()).build();
+ }
+
+ private CreateDelegationTokenResponse createCreateTokenResponse() {
+ return new CreateDelegationTokenResponse(20, Errors.NONE, SecurityUtils.parseKafkaPrincipal("User:user1"), System.currentTimeMillis(),
+ System.currentTimeMillis(), System.currentTimeMillis(), "token1", ByteBuffer.wrap("test".getBytes()));
+ }
+
+ private RenewDelegationTokenRequest createRenewTokenRequest() {
+ return new RenewDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+ }
+
+ private RenewDelegationTokenResponse createRenewTokenResponse() {
+ return new RenewDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis());
+ }
+
+ private ExpireDelegationTokenRequest createExpireTokenRequest() {
+ return new ExpireDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+ }
+
+ private ExpireDelegationTokenResponse createExpireTokenResponse() {
+ return new ExpireDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis());
+ }
+
+ private DescribeDelegationTokenRequest createDescribeTokenRequest() {
+ List<KafkaPrincipal> owners = new ArrayList<>();
+ owners.add(SecurityUtils.parseKafkaPrincipal("User:user1"));
+ owners.add(SecurityUtils.parseKafkaPrincipal("User:user2"));
+ return new DescribeDelegationTokenRequest.Builder(owners).build();
+ }
+
+ private DescribeDelegationTokenResponse createDescribeTokenResponse() {
+ List<KafkaPrincipal> renewers = new ArrayList<>();
+ renewers.add(SecurityUtils.parseKafkaPrincipal("User:user1"));
+ renewers.add(SecurityUtils.parseKafkaPrincipal("User:user2"));
+
+ List<DelegationToken> tokenList = new LinkedList<>();
+
+ TokenInformation tokenInfo1 = new TokenInformation("1", SecurityUtils.parseKafkaPrincipal("User:owner"), renewers,
+ System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis());
+
+ TokenInformation tokenInfo2 = new TokenInformation("2", SecurityUtils.parseKafkaPrincipal("User:owner1"), renewers,
+ System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis());
+
+ tokenList.add(new DelegationToken(tokenInfo1, "test".getBytes()));
+ tokenList.add(new DelegationToken(tokenInfo2, "test".getBytes()));
+
+ return new DescribeDelegationTokenResponse(20, Errors.NONE, tokenList);
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
index d5f13bc..8e07e73 100644
--- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
@@ -41,7 +41,8 @@ public class ResourceTypeTest {
new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false),
new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false),
new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false),
- new AclResourceTypeTestInfo(ResourceType.TRANSACTIONAL_ID, 5, "transactional_id", false)
+ new AclResourceTypeTestInfo(ResourceType.TRANSACTIONAL_ID, 5, "transactional_id", false),
+ new AclResourceTypeTestInfo(ResourceType.DELEGATION_TOKEN, 6, "delegation_token", false)
};
@Test
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index a30c09f..787f5a7 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@@ -118,6 +119,7 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
EasyMock.expect(server.getAuthorizationID()).andReturn("foo");
+ EasyMock.expect(server.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG)).andReturn(false);
replayAll();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c442672..bd7fee3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -51,12 +51,15 @@ import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.ScramFormatter;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -375,6 +378,39 @@ public class SaslAuthenticatorTest {
createAndCheckClientConnection(securityProtocol, "0");
}
+
+ @Test
+ public void testTokenAuthenticationOverSaslScram() throws Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ TestJaasConfig jaasConfig = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
+
+ //create jaas config for token auth
+ Map<String, Object> options = new HashMap<>();
+ String tokenId = "token1";
+ String tokenHmac = "abcdefghijkl";
+ options.put("username", tokenId); //tokenId
+ options.put("password", tokenHmac); //token hmac
+ options.put(ScramLoginModule.TOKEN_AUTH_CONFIG, "true"); //enable token authentication
+ jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+
+ server = createEchoServer(securityProtocol);
+
+ //Check invalid tokenId/tokenInfo in tokenCache
+ createAndCheckClientConnectionFailure(securityProtocol, "0");
+
+ //Check valid token Info and invalid credentials
+ KafkaPrincipal owner = SecurityUtils.parseKafkaPrincipal("User:Owner");
+ KafkaPrincipal renewer = SecurityUtils.parseKafkaPrincipal("User:Renewer1");
+ TokenInformation tokenInfo = new TokenInformation(tokenId, owner, Collections.singleton(renewer),
+ System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis());
+ server.tokenCache().addToken(tokenId, tokenInfo);
+ createAndCheckClientConnectionFailure(securityProtocol, "0");
+
+ //Check with valid token Info and credentials
+ updateTokenCredentialCache(tokenId, tokenHmac);
+ createAndCheckClientConnection(securityProtocol, "0");
+ }
+
/**
* Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
* prior to SASL handshake flow and that subsequent authentication succeeds
@@ -956,13 +992,13 @@ public class SaslAuthenticatorTest {
if (isScram)
ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext,
- securityProtocol, listenerName, saslMechanism, true, credentialCache) {
+ securityProtocol, listenerName, saslMechanism, true, credentialCache, null) {
@Override
protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
TransportLayer transportLayer, Subject subject) throws IOException {
return new SaslServerAuthenticator(configs, id, jaasContext, subject, null,
- credentialCache, listenerName, securityProtocol, transportLayer) {
+ credentialCache, listenerName, securityProtocol, transportLayer, null) {
@Override
protected ApiVersionsResponse apiVersionsResponse() {
@@ -998,7 +1034,7 @@ public class SaslAuthenticatorTest {
final Map<String, ?> configs = Collections.emptyMap();
final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs);
SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext,
- securityProtocol, listenerName, saslMechanism, true, null) {
+ securityProtocol, listenerName, saslMechanism, true, null, null) {
@Override
protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs, String id,
@@ -1207,4 +1243,15 @@ public class SaslAuthenticatorTest {
private ApiVersionsRequest createApiVersionsRequestV0() {
return new ApiVersionsRequest.Builder((short) 0).build();
}
+
+ private void updateTokenCredentialCache(String username, String password) throws NoSuchAlgorithmException {
+ for (String mechanism : (List<String>) saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) {
+ ScramMechanism scramMechanism = ScramMechanism.forMechanismName(mechanism);
+ if (scramMechanism != null) {
+ ScramFormatter formatter = new ScramFormatter(scramMechanism);
+ ScramCredential credential = formatter.generateCredential(password, 4096);
+ server.tokenCache().credentialCache(scramMechanism.mechanismName()).put(username, credential);
+ }
+ }
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 022a099..51ea58e 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.security.scram.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -110,7 +112,7 @@ public class SaslServerAuthenticatorTest {
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
Subject subject = new Subject();
return new SaslServerAuthenticator(configs, "node", jaasContext, subject, null, new CredentialCache(),
- new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer);
+ new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames()));
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
index 89e6260..58be6e1 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
@@ -70,7 +70,7 @@ public class ScramMessagesTest {
@Test
public void validClientFirstMessage() throws SaslException {
String nonce = formatter.secureRandomString();
- ClientFirstMessage m = new ClientFirstMessage("someuser", nonce);
+ ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, "");
checkClientFirstMessage(m, "someuser", nonce, "");
// Default format used by Kafka client: only user and nonce are specified
@@ -107,6 +107,11 @@ public class ScramMessagesTest {
str = String.format("n,,n=testuser,r=%s,%s", nonce, extension);
checkClientFirstMessage(createScramMessage(ClientFirstMessage.class, str), "testuser", nonce, "");
}
+
+ //optional tokenauth specified as extensions
+ str = String.format("n,,n=testuser,r=%s,%s", nonce, "tokenauth=true");
+ m = createScramMessage(ClientFirstMessage.class, str);
+ assertEquals("tokenauth=true", m.extensions());
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
index fd7f988..82ad914 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.security.scram;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.junit.Before;
import org.junit.Test;
@@ -43,7 +44,7 @@ public class ScramSaslServerTest {
CredentialCache.Cache<ScramCredential> credentialCache = new CredentialCache().createCache(mechanism.mechanismName(), ScramCredential.class);
credentialCache.put(USER_A, formatter.generateCredential("passwordA", 4096));
credentialCache.put(USER_B, formatter.generateCredential("passwordB", 4096));
- ScramServerCallbackHandler callbackHandler = new ScramServerCallbackHandler(credentialCache);
+ ScramServerCallbackHandler callbackHandler = new ScramServerCallbackHandler(credentialCache, new DelegationTokenCache(ScramMechanism.mechanismNames()));
saslServer = new ScramSaslServer(mechanism, new HashMap<String, Object>(), callbackHandler);
}
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 2732f6c..fa6333c 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -34,7 +34,8 @@ object AclCommand extends Logging {
Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
Group -> Set(Read, Describe, All),
Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
- TransactionalId -> Set(Describe, Write, All)
+ TransactionalId -> Set(Describe, Write, All),
+ DelegationToken -> Set(Describe, All)
)
def main(args: Array[String]) {
@@ -246,8 +247,11 @@ object AclCommand extends Logging {
opts.options.valuesOf(opts.transactionalIdOpt).asScala.foreach(transactionalId =>
resources += new Resource(TransactionalId, transactionalId))
+ if (opts.options.has(opts.delegationTokenOpt))
+ opts.options.valuesOf(opts.delegationTokenOpt).asScala.foreach(token => resources += new Resource(DelegationToken, token.trim))
+
if (resources.isEmpty && dieIfNoResourceFound)
- CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>")
+ CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>")
resources
}
@@ -304,6 +308,12 @@ object AclCommand extends Logging {
"used in combination with the --producer option. Note that idempotence is enabled automatically if " +
"the producer is authorized to a particular transactional-id.")
+ val delegationTokenOpt = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
+ "A value of * indicates ACL should apply to all tokens.")
+ .withRequiredArg
+ .describedAs("delegation-token")
+ .ofType(classOf[String])
+
val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 1051993..4a37330 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -34,7 +34,10 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.{LogContext, KafkaThread, Time, Utils}
+import org.apache.kafka.common.utils.LogContext
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
+import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._
@@ -333,6 +336,34 @@ class AdminClient(val time: Time,
ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
}
+ def createToken(renewers: List[KafkaPrincipal], maxTimePeriodMs: Long = -1): (Errors, DelegationToken) = {
+ val responseBody = sendAnyNode(ApiKeys.CREATE_DELEGATION_TOKEN, new CreateDelegationTokenRequest.Builder(renewers.asJava, maxTimePeriodMs))
+ val response = responseBody.asInstanceOf[CreateDelegationTokenResponse]
+ val tokenInfo = new TokenInformation(response.tokenId, response.owner, renewers.asJava,
+ response.issueTimestamp, response.maxTimestamp, response.expiryTimestamp)
+ (response.error, new DelegationToken(tokenInfo, response.hmacBytes))
+ }
+
+ def renewToken(hmac: ByteBuffer, renewTimePeriod: Long = -1): (Errors, Long) = {
+ val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new RenewDelegationTokenRequest.Builder(hmac, renewTimePeriod))
+ val response = responseBody.asInstanceOf[RenewDelegationTokenResponse]
+ (response.error, response.expiryTimestamp)
+ }
+
+ def expireToken(hmac: ByteBuffer, expiryTimeStamp: Long = -1): (Errors, Long) = {
+ val responseBody = sendAnyNode(ApiKeys.EXPIRE_DELEGATION_TOKEN, new ExpireDelegationTokenRequest.Builder(hmac, expiryTimeStamp))
+ val response = responseBody.asInstanceOf[ExpireDelegationTokenResponse]
+ (response.error, response.expiryTimestamp)
+ }
+
+ def describeToken(owners: List[KafkaPrincipal]): (Errors, List[DelegationToken]) = {
+ val ownersList = if (owners == null) null else owners.asJava
+ val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new DescribeDelegationTokenRequest.Builder(ownersList))
+ val response = responseBody.asInstanceOf[DescribeDelegationTokenResponse]
+ (response.error, response.tokens().asScala.toList)
+ }
+
+
def close() {
running = false
try {
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
new file mode 100644
index 0000000..6c5d1ce
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.nio.ByteBuffer
+
+import joptsimple._
+import kafka.utils.{CommandLineUtils, Exit, Logging}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils}
+
+import scala.collection.JavaConverters._
+import scala.collection.Set
+
+/**
+ * A command to manage delegation token.
+ */
+object DelegationTokenCommand extends Logging {
+
+ def main(args: Array[String]): Unit = {
+ val opts = new DelegationTokenCommandOptions(args)
+
+ if(args.length == 0)
+ CommandLineUtils.printUsageAndDie(opts.parser, "Tool to create, renew, expire, or describe delegation tokens.")
+
+ // should have exactly one action
+ val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt, opts.describeOpt).count(opts.options.has _)
+ if(actions != 1)
+ CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe")
+
+ opts.checkArgs()
+
+ val adminClient = createAdminClient(opts)
+
+ var exitCode = 0
+ try {
+ if(opts.options.has(opts.createOpt))
+ createToken(adminClient, opts)
+ else if(opts.options.has(opts.renewOpt))
+ renewToken(adminClient, opts)
+ else if(opts.options.has(opts.expiryOpt))
+ expireToken(adminClient, opts)
+ else if(opts.options.has(opts.describeOpt))
+ describeToken(adminClient, opts)
+ } catch {
+ case e: Throwable =>
+ println("Error while executing delegation token command : " + e.getMessage)
+ error(Utils.stackTrace(e))
+ exitCode = 1
+ } finally {
+ adminClient.close()
+ Exit.exit(exitCode)
+ }
+ }
+
+ def createToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+ val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt)
+ val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
+
+ println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs)
+ val response = adminClient.createToken(renewerPrincipals, maxLifeTimeMs)
+ response match {
+ case (Errors.NONE, token) => println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
+ case (e, _) => throw new AdminOperationException(e.message)
+ }
+ }
+
+ def printToken(tokens: List[DelegationToken]): Unit = {
+ print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
+ for (token <- tokens) {
+ val tokenInfo = token.tokenInfo
+ print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+ tokenInfo.tokenId,
+ token.hmacAsBase64String,
+ tokenInfo.owner,
+ tokenInfo.renewersAsString,
+ tokenInfo.issueTimestamp,
+ tokenInfo.expiryTimestamp,
+ tokenInfo.maxTimestamp))
+ println()
+ }
+ }
+
+ private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): List[KafkaPrincipal] = {
+ if (opts.options.has(principalOptionSpec))
+ opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList
+ else
+ List.empty[KafkaPrincipal]
+ }
+
+ def renewToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+ val hmac = opts.options.valueOf(opts.hmacOpt)
+ val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
+ println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
+ val response = adminClient.renewToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), renewTimePeriodMs)
+ response match {
+ case (Errors.NONE, expiryTimeStamp) => println("Completed renew operation. New expiry timestamp : %s".format(expiryTimeStamp))
+ case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
+ }
+ }
+
+ def expireToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+ val hmac = opts.options.valueOf(opts.hmacOpt)
+ val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
+ println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
+ val response = adminClient.expireToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), expiryTimePeriodMs)
+ response match {
+ case (Errors.NONE, expiryTimeStamp) => println("Completed expire operation. New expiry timestamp : %s".format(expiryTimeStamp))
+ case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
+ }
+ }
+
+ def describeToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+ val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
+ println("Calling describe token operation for owners :" + ownerPrincipals)
+ val response = adminClient.describeToken(ownerPrincipals)
+ response match {
+ case (Errors.NONE, tokens) => println("Total Number of tokens : %s".format(tokens.size)); printToken(tokens)
+ case (e, tokens) => throw new AdminOperationException(e.message)
+ }
+ }
+
+ private def createAdminClient(opts: DelegationTokenCommandOptions): AdminClient = {
+ val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+ AdminClient.create(props)
+ }
+
+ class DelegationTokenCommandOptions(args: Array[String]) {
+ val BootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping."
+ val CommandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+ " operations are allowed in secure mode only. This config file is used to pass security related configs."
+
+ val parser = new OptionParser(false)
+ val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
+ .withRequiredArg
+ .ofType(classOf[String])
+ val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+ .withRequiredArg
+ .ofType(classOf[String])
+
+ val createOpt = parser.accepts("create", "Create a new delegation token.")
+ val renewOpt = parser.accepts("renew", "Renew delegation token.")
+ val expiryOpt = parser.accepts("expire", "Expire delegation token.")
+ val describeOpt = parser.accepts("describe", "describe delegation tokens.")
+
+ val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
+ .withOptionalArg()
+ .ofType(classOf[String])
+
+ val renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a kafka principal. It is should be in principalType:name format.")
+ .withOptionalArg()
+ .ofType(classOf[String])
+
+ val maxLifeTimeOpt = parser.accepts("max-life-time-period", "Max life period for the token in milliseconds. If the value is -1," +
+ " then token max life time will default to a server side config value (delegation.token.max.lifetime.ms).")
+ .withOptionalArg()
+ .ofType(classOf[Long])
+
+ val renewTimePeriodOpt = parser.accepts("renew-time-period", "Renew time period in milliseconds. If the value is -1, then the" +
+ " renew time period will default to a server side config value (delegation.token.expiry.time.ms).")
+ .withOptionalArg()
+ .ofType(classOf[Long])
+
+ val expiryTimePeriodOpt = parser.accepts("expiry-time-period", "Expiry time period in milliseconds. If the value is -1, then the" +
+ " token will get invalidated immediately." )
+ .withOptionalArg()
+ .ofType(classOf[Long])
+
+ val hmacOpt = parser.accepts("hmac", "HMAC of the delegation token")
+ .withOptionalArg
+ .ofType(classOf[String])
+
+ val options = parser.parse(args : _*)
+
+ def checkArgs() {
+ // check required args
+ CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt)
+
+ if (options.has(createOpt))
+ CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt)
+
+ if (options.has(renewOpt))
+ CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, renewTimePeriodOpt)
+
+ if (options.has(expiryOpt))
+ CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, expiryTimePeriodOpt)
+
+ // check invalid args
+ CommandLineUtils.checkInvalidArgs(parser, options, createOpt, Set(hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt, ownerPrincipalsOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, Set(renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, Set(renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt))
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6b5c34e..a8e1249 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -55,7 +55,8 @@ object KafkaController extends Logging {
}
class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo,
- threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+ tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+
this.logIdent = s"[Controller id=${config.brokerId}] "
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
@@ -90,6 +91,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
@volatile private var globalTopicCount = 0
@volatile private var globalPartitionCount = 0
+ /* single-thread scheduler to clean expired tokens */
+ private val tokenCleanScheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "delegation-token-cleaner")
+
newGauge(
"ActiveControllerCount",
new Gauge[Int] {
@@ -243,6 +247,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
+
+ if (config.tokenAuthEnabled) {
+ info("starting the token expiry check scheduler")
+ tokenCleanScheduler.startup()
+ tokenCleanScheduler.schedule(name = "delete-expired-tokens",
+ fun = tokenManager.expireTokens,
+ period = config.delegationTokenExpiryCheckIntervalMs,
+ unit = TimeUnit.MILLISECONDS)
+ }
}
private def scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit = {
@@ -272,6 +285,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
globalTopicCount = 0
globalPartitionCount = 0
+ // stop token expiry check scheduler
+ if (tokenCleanScheduler.isStarted)
+ tokenCleanScheduler.shutdown()
+
// de-register partition ISR listener for on-going partition reassignment task
unregisterPartitionReassignmentIsrChangeHandlers()
// shutdown partition state machine
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 200bfe2..d5e49a5 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -437,7 +437,7 @@ private[kafka] class Processor(val id: Int,
)
private val selector = createSelector(
- ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
+ ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache, credentialProvider.tokenCache))
// Visible to override for testing
protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = new KSelector(
maxRequestSize,
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 5d9d7ba..120e8f9 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -23,8 +23,9 @@ import org.apache.kafka.common.security.authenticator.CredentialCache
import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef._
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
-class CredentialProvider(saslEnabledMechanisms: List[String]) {
+class CredentialProvider(saslEnabledMechanisms: List[String], val tokenCache: DelegationTokenCache) {
val credentialCache = new CredentialCache
ScramCredentialUtils.createCache(credentialCache, saslEnabledMechanisms)
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index b046ddd..66da610 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -49,6 +49,12 @@ case object TransactionalId extends ResourceType {
val toJava = JResourceType.TRANSACTIONAL_ID
}
+case object DelegationToken extends ResourceType {
+ val name = "DelegationToken"
+ val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
+ val toJava = JResourceType.DELEGATION_TOKEN;
+}
+
object ResourceType {
def fromString(resourceType: String): ResourceType = {
@@ -56,7 +62,7 @@ object ResourceType {
rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
}
- def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId)
+ def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId, DelegationToken)
def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", ""))
}
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
new file mode 100644
index 0000000..008dc32
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -0,0 +1,515 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.security.InvalidKeyException
+import javax.crypto.spec.SecretKeySpec
+import javax.crypto.{Mac, SecretKey}
+
+import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{CoreUtils, Json, Logging}
+import kafka.zk.{DelegationTokenChangeNotificationSequenceZNode, DelegationTokenChangeNotificationZNode, DelegationTokensZNode, KafkaZkClient}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.scram.{ScramCredential, ScramFormatter, ScramMechanism}
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+object DelegationTokenManager {
+ val DefaultHmacAlgorithm = "HmacSHA512"
+ val OwnerKey ="owner"
+ val RenewersKey = "renewers"
+ val IssueTimestampKey = "issueTimestamp"
+ val MaxTimestampKey = "maxTimestamp"
+ val ExpiryTimestampKey = "expiryTimestamp"
+ val TokenIdKey = "tokenId"
+ val VersionKey = "version"
+ val CurrentVersion = 1
+ val ErrorTimestamp = -1
+
+ /**
+ *
+ * @param tokenId
+ * @param secretKey
+ * @return
+ */
+ def createHmac(tokenId: String, secretKey: String) : Array[Byte] = {
+ createHmac(tokenId, createSecretKey(secretKey.getBytes(StandardCharsets.UTF_8)))
+ }
+
+ /**
+ * Convert the byte[] to a secret key
+ * @param keybytes the byte[] to create the secret key from
+ * @return the secret key
+ */
+ def createSecretKey(keybytes: Array[Byte]) : SecretKey = {
+ new SecretKeySpec(keybytes, DefaultHmacAlgorithm)
+ }
+
+ /**
+ *
+ *
+ * @param tokenId
+ * @param secretKey
+ * @return
+ */
+ def createBase64HMAC(tokenId: String, secretKey: SecretKey) : String = {
+ val hmac = createHmac(tokenId, secretKey)
+ Base64.encoder.encodeToString(hmac)
+ }
+
+ /**
+ * Compute HMAC of the identifier using the secret key
+ * @param tokenId the bytes of the identifier
+ * @param secretKey the secret key
+ * @return String of the generated hmac
+ */
+ def createHmac(tokenId: String, secretKey: SecretKey) : Array[Byte] = {
+ val mac = Mac.getInstance(DefaultHmacAlgorithm)
+ try
+ mac.init(secretKey)
+ catch {
+ case ike: InvalidKeyException => throw new IllegalArgumentException("Invalid key to HMAC computation", ike);
+ }
+ mac.doFinal(tokenId.getBytes(StandardCharsets.UTF_8))
+ }
+
+ def toJsonCompatibleMap(token: DelegationToken): Map[String, Any] = {
+ val tokenInfo = token.tokenInfo
+ val tokenInfoMap = mutable.Map[String, Any]()
+ tokenInfoMap(VersionKey) = CurrentVersion
+ tokenInfoMap(OwnerKey) = Sanitizer.sanitize(tokenInfo.ownerAsString)
+ tokenInfoMap(RenewersKey) = tokenInfo.renewersAsString.asScala.map(e => Sanitizer.sanitize(e)).asJava
+ tokenInfoMap(IssueTimestampKey) = tokenInfo.issueTimestamp
+ tokenInfoMap(MaxTimestampKey) = tokenInfo.maxTimestamp
+ tokenInfoMap(ExpiryTimestampKey) = tokenInfo.expiryTimestamp
+ tokenInfoMap(TokenIdKey) = tokenInfo.tokenId()
+ tokenInfoMap.toMap
+ }
+
+ def fromBytes(bytes: Array[Byte]): Option[TokenInformation] = {
+ if (bytes == null || bytes.isEmpty)
+ return None
+
+ Json.parseBytes(bytes) match {
+ case Some(js) =>
+ val mainJs = js.asJsonObject
+ require(mainJs(VersionKey).to[Int] == CurrentVersion)
+ val owner = SecurityUtils.parseKafkaPrincipal(Sanitizer.desanitize(mainJs(OwnerKey).to[String]))
+ val renewerStr = mainJs(RenewersKey).to[Seq[String]]
+ val renewers = renewerStr.map(Sanitizer.desanitize(_)).map(SecurityUtils.parseKafkaPrincipal(_))
+ val issueTimestamp = mainJs(IssueTimestampKey).to[Long]
+ val expiryTimestamp = mainJs(ExpiryTimestampKey).to[Long]
+ val maxTimestamp = mainJs(MaxTimestampKey).to[Long]
+ val tokenId = mainJs(TokenIdKey).to[String]
+
+ val tokenInfo = new TokenInformation(tokenId, owner, renewers.asJava,
+ issueTimestamp, maxTimestamp, expiryTimestamp)
+
+ Some(tokenInfo)
+ case None =>
+ None
+ }
+ }
+
+ def filterToken(requestedPrincipal: KafkaPrincipal, owners : Option[List[KafkaPrincipal]], token: TokenInformation, authorizeToken: String => Boolean) : Boolean = {
+
+ val allow =
+ //exclude tokens which are not requested
+ if (!owners.isEmpty && !owners.get.exists(owner => token.ownerOrRenewer(owner))) {
+ false
+ //Owners and the renewers can describe their own tokens
+ } else if (token.ownerOrRenewer(requestedPrincipal)) {
+ true
+ // Check permission for non-owned tokens
+ } else if ((authorizeToken(token.tokenId))) {
+ true
+ }
+ else {
+ false
+ }
+
+ allow
+ }
+}
+
+class DelegationTokenManager(val config: KafkaConfig,
+ val tokenCache: DelegationTokenCache,
+ val time: Time,
+ val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
+ this.logIdent = "[Token Manager on Broker " + config.brokerId + "]: "
+
+ import DelegationTokenManager._
+
+ type CreateResponseCallback = CreateTokenResult => Unit
+ type RenewResponseCallback = (Errors, Long) => Unit
+ type ExpireResponseCallback = (Errors, Long) => Unit
+ type DescribeResponseCallback = (Errors, List[DelegationToken]) => Unit
+
+ val secretKey = {
+ val keyBytes = if (config.tokenAuthEnabled) config.delegationTokenMasterKey.value.getBytes(StandardCharsets.UTF_8) else null
+ if (keyBytes == null || keyBytes.length == 0) null
+ else
+ createSecretKey(keyBytes)
+ }
+
+ val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs
+ val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
+ val tokenRemoverScanInterval: Long = config.delegationTokenExpiryCheckIntervalMs
+ private val lock = new Object()
+ private var tokenChangeListener: ZkNodeChangeNotificationListener = null
+
+ def startup() = {
+ if (config.tokenAuthEnabled) {
+ zkClient.createDelegationTokenPaths
+ loadCache
+ tokenChangeListener = new ZkNodeChangeNotificationListener(zkClient, DelegationTokenChangeNotificationZNode.path, DelegationTokenChangeNotificationSequenceZNode.SequenceNumberPrefix, TokenChangedNotificationHandler)
+ tokenChangeListener.init
+ }
+ }
+
+ def shutdown() = {
+ if (config.tokenAuthEnabled) {
+ if (tokenChangeListener != null) tokenChangeListener.close()
+ }
+ }
+
+ private def loadCache() {
+ lock.synchronized {
+ val tokens = zkClient.getChildren(DelegationTokensZNode.path)
+ info(s"Loading the token cache. Total token count : " + tokens.size)
+ for (tokenId <- tokens) {
+ try {
+ getTokenFromZk(tokenId) match {
+ case Some(token) => updateCache(token)
+ case None =>
+ }
+ } catch {
+ case ex: Throwable => error(s"Error while getting Token for tokenId :$tokenId", ex)
+ }
+ }
+ }
+ }
+
+ private def getTokenFromZk(tokenId: String): Option[DelegationToken] = {
+ zkClient.getDelegationTokenInfo(tokenId) match {
+ case Some(tokenInformation) => {
+ val hmac = createHmac(tokenId, secretKey)
+ Some(new DelegationToken(tokenInformation, hmac))
+ }
+ case None =>
+ None
+ }
+ }
+
+ /**
+ *
+ * @param token
+ */
+ private def updateCache(token: DelegationToken): Unit = {
+ val hmacString = token.hmacAsBase64String
+ val scramCredentialMap = prepareScramCredentials(hmacString)
+ tokenCache.updateCache(token, scramCredentialMap.asJava)
+ }
+ /**
+ * @param hmacString
+ */
+ private def prepareScramCredentials(hmacString: String) : Map[String, ScramCredential] = {
+ val scramCredentialMap = mutable.Map[String, ScramCredential]()
+
+ def scramCredential(mechanism: ScramMechanism): ScramCredential = {
+ new ScramFormatter(mechanism).generateCredential(hmacString, mechanism.minIterations)
+ }
+
+ for (mechanism <- ScramMechanism.values)
+ scramCredentialMap(mechanism.mechanismName) = scramCredential(mechanism)
+
+ scramCredentialMap.toMap
+ }
+
+ /**
+ *
+ * @param owner
+ * @param renewers
+ * @param maxLifeTimeMs
+ * @param responseCallback
+ */
+ def createToken(owner: KafkaPrincipal,
+ renewers: List[KafkaPrincipal],
+ maxLifeTimeMs: Long,
+ responseCallback: CreateResponseCallback) {
+
+ if (!config.tokenAuthEnabled) {
+ responseCallback(CreateTokenResult(-1, -1, -1, "", Array[Byte](), Errors.DELEGATION_TOKEN_AUTH_DISABLED))
+ } else {
+ lock.synchronized {
+ val tokenId = CoreUtils.generateUuidAsBase64
+
+ val issueTimeStamp = time.milliseconds
+ val maxLifeTime = if (maxLifeTimeMs <= 0) tokenMaxLifetime else Math.min(maxLifeTimeMs, tokenMaxLifetime)
+ val maxLifeTimeStamp = issueTimeStamp + maxLifeTime
+ val expiryTimeStamp = Math.min(maxLifeTimeStamp, issueTimeStamp + defaultTokenRenewTime)
+
+ val tokenInfo = new TokenInformation(tokenId, owner, renewers.asJava, issueTimeStamp, maxLifeTimeStamp, expiryTimeStamp)
+
+ val hmac = createHmac(tokenId, secretKey)
+ val token = new DelegationToken(tokenInfo, hmac)
+ updateToken(token)
+ info(s"Created a delegation token : $tokenId for owner : $owner")
+ responseCallback(CreateTokenResult(issueTimeStamp, expiryTimeStamp, maxLifeTimeStamp, tokenId, hmac, Errors.NONE))
+ }
+ }
+ }
+
+ /**
+ *
+ * @param principal
+ * @param hmac
+ * @param renewLifeTimeMs
+ * @param renewCallback
+ */
+ def renewToken(principal: KafkaPrincipal,
+ hmac: ByteBuffer,
+ renewLifeTimeMs: Long,
+ renewCallback: RenewResponseCallback) {
+
+ if (!config.tokenAuthEnabled) {
+ renewCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, -1)
+ } else {
+ lock.synchronized {
+ getToken(hmac) match {
+ case Some(token) => {
+ val now = time.milliseconds
+ val tokenInfo = token.tokenInfo
+
+ if (!allowedToRenew(principal, tokenInfo)) {
+ renewCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
+ } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
+ renewCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
+ } else {
+ val renewLifeTime = if (renewLifeTimeMs < 0) defaultTokenRenewTime else renewLifeTimeMs
+ val renewTimeStamp = now + renewLifeTime
+ val expiryTimeStamp = Math.min(tokenInfo.maxTimestamp, renewTimeStamp)
+ tokenInfo.setExpiryTimestamp(expiryTimeStamp)
+
+ updateToken(token)
+ info(s"Delegation token renewed for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+ renewCallback(Errors.NONE, expiryTimeStamp)
+ }
+ }
+ case None => renewCallback(Errors.DELEGATION_TOKEN_NOT_FOUND, -1)
+ }
+ }
+ }
+ }
+
+ /**
+ * @param token
+ */
+ private def updateToken(token: DelegationToken): Unit = {
+ zkClient.setOrCreateDelegationToken(token)
+ updateCache(token)
+ zkClient.createTokenChangeNotification(token.tokenInfo.tokenId())
+ }
+
+ /**
+ *
+ * @param hmac
+ * @return
+ */
+ private def getToken(hmac: ByteBuffer): Option[DelegationToken] = {
+ try {
+ val byteArray = new Array[Byte](hmac.remaining)
+ hmac.get(byteArray)
+ val base64Pwd = Base64.encoder.encodeToString(byteArray)
+ val tokenInfo = tokenCache.tokenForHmac(base64Pwd)
+ if (tokenInfo == null) None else Some(new DelegationToken(tokenInfo, byteArray))
+ } catch {
+ case e: Exception =>
+ error("Exception while getting token for hmac", e)
+ None
+ }
+ }
+
+ /**
+ *
+ * @param principal
+ * @param tokenInfo
+ * @return
+ */
+ private def allowedToRenew(principal: KafkaPrincipal, tokenInfo: TokenInformation): Boolean = {
+ if (principal.equals(tokenInfo.owner) || tokenInfo.renewers.asScala.toList.contains(principal)) true else false
+ }
+
+ /**
+ *
+ * @param tokenId
+ * @return
+ */
+ def getToken(tokenId: String): Option[DelegationToken] = {
+ val tokenInfo = tokenCache.token(tokenId)
+ if (tokenInfo != null) Some(getToken(tokenInfo)) else None
+ }
+
+ /**
+ *
+ * @param tokenInfo
+ * @return
+ */
+ private def getToken(tokenInfo: TokenInformation): DelegationToken = {
+ val hmac = createHmac(tokenInfo.tokenId, secretKey)
+ new DelegationToken(tokenInfo, hmac)
+ }
+
+ /**
+ *
+ * @param principal
+ * @param hmac
+ * @param expireLifeTimeMs
+ * @param expireResponseCallback
+ */
+ def expireToken(principal: KafkaPrincipal,
+ hmac: ByteBuffer,
+ expireLifeTimeMs: Long,
+ expireResponseCallback: ExpireResponseCallback) {
+
+ if (!config.tokenAuthEnabled) {
+ expireResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, -1)
+ } else {
+ lock.synchronized {
+ getToken(hmac) match {
+ case Some(token) => {
+ val tokenInfo = token.tokenInfo
+ val now = time.milliseconds
+
+ if (!allowedToRenew(principal, tokenInfo)) {
+ expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
+ } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
+ expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
+ } else if (expireLifeTimeMs < 0) { //expire immediately
+ removeToken(tokenInfo.tokenId)
+ info(s"Token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+ expireResponseCallback(Errors.NONE, now)
+ } else {
+ //set expiry time stamp
+ val expiryTimeStamp = Math.min(tokenInfo.maxTimestamp, now + expireLifeTimeMs)
+ tokenInfo.setExpiryTimestamp(expiryTimeStamp)
+
+ updateToken(token)
+ info(s"Updated expiry time for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+ expireResponseCallback(Errors.NONE, expiryTimeStamp)
+ }
+ }
+ case None => expireResponseCallback(Errors.DELEGATION_TOKEN_NOT_FOUND, -1)
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @param tokenId
+ */
+ private def removeToken(tokenId: String): Unit = {
+ zkClient.deleteDelegationToken(tokenId)
+ removeCache(tokenId)
+ zkClient.createTokenChangeNotification(tokenId)
+ }
+
+ /**
+ *
+ * @param tokenId
+ */
+ private def removeCache(tokenId: String): Unit = {
+ tokenCache.removeCache(tokenId)
+ }
+
+ /**
+ *
+ * @return
+ */
+ def expireTokens(): Unit = {
+ lock.synchronized {
+ for (tokenInfo <- getAllTokenInformation) {
+ val now = time.milliseconds
+ if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
+ info(s"Delegation token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+ removeToken(tokenInfo.tokenId)
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @return
+ */
+ def getAllTokenInformation(): List[TokenInformation] = {
+ tokenCache.tokens.asScala.toList
+ }
+
+ def getTokens(filterToken: TokenInformation => Boolean): List[DelegationToken] = {
+ getAllTokenInformation().filter(filterToken).map(token => getToken(token))
+ }
+
+ object TokenChangedNotificationHandler extends NotificationHandler {
+ override def processNotification(tokenIdBytes: Array[Byte]) {
+ lock.synchronized {
+ val tokenId = new String(tokenIdBytes, StandardCharsets.UTF_8)
+ info(s"Processing Token Notification for tokenId : $tokenId")
+ getTokenFromZk(tokenId) match {
+ case Some(token) => updateCache(token)
+ case None => removeCache(tokenId)
+ }
+ }
+ }
+ }
+
+}
+
+case class CreateTokenResult(issueTimestamp: Long,
+ expiryTimestamp: Long,
+ maxTimestamp: Long,
+ tokenId: String,
+ hmac: Array[Byte],
+ error: Errors) {
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case that: CreateTokenResult =>
+ error.equals(that.error) &&
+ tokenId.equals(that.tokenId) &&
+ issueTimestamp.equals(that.issueTimestamp) &&
+ expiryTimestamp.equals(that.expiryTimestamp) &&
+ maxTimestamp.equals(that.maxTimestamp) &&
+ (hmac sameElements that.hmac)
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int = {
+ val fields = Seq(issueTimestamp, expiryTimestamp, maxTimestamp, tokenId, hmac, error)
+ fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 758a305..b145d3c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -56,6 +56,8 @@ import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshake
import org.apache.kafka.common.resource.{Resource => AdminResource}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
import DescribeLogDirsResponse.LogDirInfo
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import scala.collection.{mutable, _}
import scala.collection.JavaConverters._
@@ -80,7 +82,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val quotas: QuotaManagers,
brokerTopicStats: BrokerTopicStats,
val clusterId: String,
- time: Time) extends Logging {
+ time: Time,
+ val tokenManager: DelegationTokenManager) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
val adminZkClient = new AdminZkClient(zkClient)
@@ -135,6 +138,10 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
+ case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
+ case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
+ case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
+ case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> handleDescribeTokensRequest(request)
}
} catch {
case e: FatalExitError => throw e
@@ -1985,6 +1992,127 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(throttleTimeMs, logDirInfos.asJava))
}
+ def handleCreateTokenRequest(request: RequestChannel.Request) {
+ val createTokenRequest = request.body[CreateDelegationTokenRequest]
+
+ // the callback for sending a create token response
+ def sendResponseCallback(createResult: CreateTokenResult) {
+ trace("Sending create token response for correlation id %d to client %s."
+ .format(request.header.correlationId, request.header.clientId))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new CreateDelegationTokenResponse(requestThrottleMs, createResult.error, request.session.principal, createResult.issueTimestamp,
+ createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId, ByteBuffer.wrap(createResult.hmac)))
+ }
+
+ if (!allowTokenRequests(request))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new CreateDelegationTokenResponse(requestThrottleMs, Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, request.session.principal))
+ else {
+ val renewerList = createTokenRequest.renewers().asScala.toList
+
+ if (renewerList.exists(principal => principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) {
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new CreateDelegationTokenResponse(requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, request.session.principal))
+ }
+ else {
+ tokenManager.createToken(
+ request.session.principal,
+ createTokenRequest.renewers().asScala.toList,
+ createTokenRequest.maxLifeTime(),
+ sendResponseCallback
+ )
+ }
+ }
+ }
+
+ def handleRenewTokenRequest(request: RequestChannel.Request) {
+ val renewTokenRequest = request.body[RenewDelegationTokenRequest]
+
+ // the callback for sending a renew token response
+ def sendResponseCallback(error: Errors, expiryTimestamp: Long) {
+ trace("Sending renew token response %s for correlation id %d to client %s."
+ .format(request.header.correlationId, request.header.clientId))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new RenewDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp))
+ }
+
+ if (!allowTokenRequests(request))
+ sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, DelegationTokenManager.ErrorTimestamp)
+ else {
+ tokenManager.renewToken(
+ request.session.principal,
+ renewTokenRequest.hmac,
+ renewTokenRequest.renewTimePeriod(),
+ sendResponseCallback
+ )
+ }
+ }
+
+ def handleExpireTokenRequest(request: RequestChannel.Request) {
+ val expireTokenRequest = request.body[ExpireDelegationTokenRequest]
+
+ // the callback for sending a expire token response
+ def sendResponseCallback(error: Errors, expiryTimestamp: Long) {
+ trace("Sending expire token response for correlation id %d to client %s."
+ .format(request.header.correlationId, request.header.clientId))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new ExpireDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp))
+ }
+
+ if (!allowTokenRequests(request))
+ sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, DelegationTokenManager.ErrorTimestamp)
+ else {
+ tokenManager.expireToken(
+ request.session.principal,
+ expireTokenRequest.hmac(),
+ expireTokenRequest.expiryTimePeriod(),
+ sendResponseCallback
+ )
+ }
+ }
+
+ def handleDescribeTokensRequest(request: RequestChannel.Request) {
+ val describeTokenRequest = request.body[DescribeDelegationTokenRequest]
+
+ // the callback for sending a describe token response
+ def sendResponseCallback(error: Errors, tokenDetails: List[DelegationToken]) {
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DescribeDelegationTokenResponse(requestThrottleMs, error, tokenDetails.asJava))
+ trace("Sending describe token response for correlation id %d to client %s."
+ .format(request.header.correlationId, request.header.clientId))
+ }
+
+ if (!allowTokenRequests(request))
+ sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, List.empty)
+ else if (!config.tokenAuthEnabled)
+ sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, List.empty)
+ else {
+ val requestPrincipal = request.session.principal
+
+ if (describeTokenRequest.ownersListEmpty()) {
+ sendResponseCallback(Errors.NONE, List())
+ }
+ else {
+ val owners = if (describeTokenRequest.owners == null) None else Some(describeTokenRequest.owners.asScala.toList)
+ def authorizeToken(tokenId: String) = authorize(request.session, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId))
+ def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken)
+ val tokens = tokenManager.getTokens(eligible)
+ sendResponseCallback(Errors.NONE, tokens)
+ }
+ }
+ }
+
+ def allowTokenRequests(request: RequestChannel.Request): Boolean = {
+ val protocol = request.context.securityProtocol
+ if (request.session.principal.tokenAuthenticated ||
+ protocol == SecurityProtocol.PLAINTEXT ||
+ // disallow requests from 1-way SSL
+ (protocol == SecurityProtocol.SSL && request.session.principal == KafkaPrincipal.ANONYMOUS))
+ false
+ else
+ true
+ }
+
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e24659c..0f0eb14 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -216,6 +216,11 @@ object Defaults {
val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
val SaslKerberosPrincipalToLocalRules = BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
+
+ /** ********* Delegation Token configuration ***********/
+ val DelegationTokenMaxLifeTimeMsDefault = 7 * 24 * 60 * 60 * 1000L
+ val DelegationTokenExpiryTimeMsDefault = 24 * 60 * 60 * 1000L
+ val DelegationTokenExpiryCheckIntervalMsDefault = 1 * 60 * 60 * 1000L
}
object KafkaConfig {
@@ -413,6 +418,12 @@ object KafkaConfig {
val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN
val SaslKerberosPrincipalToLocalRulesProp = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG
+ /** ********* Delegation Token Configuration ****************/
+ val DelegationTokenMasterKeyProp = "delegation.token.master.key"
+ val DelegationTokenMaxLifeTimeProp = "delegation.token.max.lifetime.ms"
+ val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
+ val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
+
/* Documentation */
/** ********* Zookeeper Configuration ***********/
val ZkConnectDoc = "Zookeeper host string"
@@ -684,6 +695,14 @@ object KafkaConfig {
val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
val SaslKerberosPrincipalToLocalRulesDoc = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
+ /** ********* Delegation Token Configuration ****************/
+ val DelegationTokenMasterKeyDoc = "Master/secret key to generate and verify delegation tokens. Same key must be configured across all the brokers. " +
+ " If the key is not set or set to empty string, brokers will disable the delegation token support."
+ val DelegationTokenMaxLifeTimeDoc = "The token has a maximum lifetime beyond which it cannot be renewed anymore. Default value 7 days."
+ val DelegationTokenExpiryTimeMsDoc = "The token validity time in seconds before the token needs to be renewed. Default value 1 day."
+ val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
+
+
private val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
@@ -884,6 +903,11 @@ object KafkaConfig {
.define(SaslKerberosTicketRenewJitterProp, DOUBLE, Defaults.SaslKerberosTicketRenewJitter, MEDIUM, SaslKerberosTicketRenewJitterDoc)
.define(SaslKerberosMinTimeBeforeReloginProp, LONG, Defaults.SaslKerberosMinTimeBeforeRelogin, MEDIUM, SaslKerberosMinTimeBeforeReloginDoc)
.define(SaslKerberosPrincipalToLocalRulesProp, LIST, Defaults.SaslKerberosPrincipalToLocalRules, MEDIUM, SaslKerberosPrincipalToLocalRulesDoc)
+ /** ********* Delegation Token Configuration ****************/
+ .define(DelegationTokenMasterKeyProp, PASSWORD, null, MEDIUM, DelegationTokenMasterKeyDoc)
+ .define(DelegationTokenMaxLifeTimeProp, LONG, Defaults.DelegationTokenMaxLifeTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenMaxLifeTimeDoc)
+ .define(DelegationTokenExpiryTimeMsProp, LONG, Defaults.DelegationTokenExpiryTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenExpiryTimeMsDoc)
+ .define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DelegationTokenExpiryCheckIntervalMsDefault, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)
}
@@ -1093,6 +1117,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
+ /** ********* DelegationToken Configuration **************/
+ val delegationTokenMasterKey = getPassword(KafkaConfig.DelegationTokenMasterKeyProp)
+ val tokenAuthEnabled = (delegationTokenMasterKey != null && !delegationTokenMasterKey.value.isEmpty)
+ val delegationTokenMaxLifeMs = getLong(KafkaConfig.DelegationTokenMaxLifeTimeProp)
+ val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
+ val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
+
/** ********* Quota Configuration **************/
val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index df06c1a..355b741 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,6 +43,8 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Node}
@@ -119,10 +121,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
var replicaManager: ReplicaManager = null
var adminManager: AdminManager = null
+ var tokenManager: DelegationTokenManager = null
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: DynamicConfigManager = null
var credentialProvider: CredentialProvider = null
+ var tokenCache: DelegationTokenCache = null
var groupCoordinator: GroupCoordinator = null
@@ -228,7 +232,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
- credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
+ tokenCache = new DelegationTokenCache(config.saslEnabledMechanisms)
+ credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, tokenCache)
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup()
@@ -243,8 +248,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// Now that the broker id is successfully registered, checkpoint it
checkpointBrokerId(config.brokerId)
+ /* start token manager */
+ tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
+ tokenManager.startup()
+
/* start kafka controller */
- kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, threadNamePrefix)
+ kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, tokenManager, threadNamePrefix)
kafkaController.startup()
adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
@@ -269,7 +278,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
- brokerTopicStats, clusterId, time)
+ brokerTopicStats, clusterId, time, tokenManager)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
@@ -555,6 +564,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (groupCoordinator != null)
CoreUtils.swallow(groupCoordinator.shutdown(), this)
+ if (tokenManager != null)
+ CoreUtils.swallow(tokenManager.shutdown(), this)
+
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
if (logManager != null)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ac8b932..098670c 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -207,7 +207,7 @@ class ZkUtils(val zkClient: ZkClient,
val zkConnection: ZkConnection,
val isSecure: Boolean) extends Logging {
import ZkUtils._
-
+
// These are persistent ZK paths that should exist on kafka broker startup.
val persistentZkPaths = ZkData.PersistentZkPaths
@@ -723,7 +723,7 @@ class ZkUtils(val zkClient: ZkClient,
debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
}
-
+
def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
topics.foreach { topic =>
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 8179300..d683a8d 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -31,6 +31,7 @@ import kafka.server.ConfigType
import kafka.utils.Logging
import kafka.zookeeper._
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.data.{ACL, Stat}
@@ -1079,6 +1080,78 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
}
/**
+ * Creates the required zk nodes for Delegation Token storage
+ */
+ def createDelegationTokenPaths(): Unit = {
+ createRecursive(DelegationTokenChangeNotificationZNode.path, throwIfPathExists = false)
+ createRecursive(DelegationTokensZNode.path, throwIfPathExists = false)
+ }
+
+ /**
+ * Creates Delegation Token change notification message
+ * @param tokenId token Id
+ */
+ def createTokenChangeNotification(tokenId: String): Unit = {
+ val path = DelegationTokenChangeNotificationSequenceZNode.createPath
+ val createRequest = CreateRequest(path, DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+ val createResponse = retryRequestUntilConnected(createRequest)
+ createResponse.resultException.foreach(e => throw e)
+ }
+
+ /**
+ * Sets or creates token info znode with the given token details depending on whether it already
+ * exists or not.
+ *
+ * @param token the token to set on the token znode
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def setOrCreateDelegationToken(token: DelegationToken): Unit = {
+
+ def set(tokenData: Array[Byte]): SetDataResponse = {
+ val setDataRequest = SetDataRequest(DelegationTokenInfoZNode.path(token.tokenInfo().tokenId()), tokenData, ZkVersion.NoVersion)
+ retryRequestUntilConnected(setDataRequest)
+ }
+
+ def create(tokenData: Array[Byte]): CreateResponse = {
+ val path = DelegationTokenInfoZNode.path(token.tokenInfo().tokenId())
+ val createRequest = CreateRequest(path, tokenData, acls(path), CreateMode.PERSISTENT)
+ retryRequestUntilConnected(createRequest)
+ }
+
+ val tokenInfo = DelegationTokenInfoZNode.encode(token)
+ val setDataResponse = set(tokenInfo)
+ setDataResponse.resultCode match {
+ case Code.NONODE =>
+ val createDataResponse = create(tokenInfo)
+ createDataResponse.maybeThrow
+ case _ => setDataResponse.maybeThrow
+ }
+ }
+
+ /**
+ * Gets the Delegation Token Info
+ * @return optional TokenInfo that is Some if the token znode exists and can be parsed and None otherwise.
+ */
+ def getDelegationTokenInfo(delegationTokenId: String): Option[TokenInformation] = {
+ val getDataRequest = GetDataRequest(DelegationTokenInfoZNode.path(delegationTokenId))
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ getDataResponse.resultCode match {
+ case Code.OK => DelegationTokenInfoZNode.decode(getDataResponse.data)
+ case Code.NONODE => None
+ case _ => throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Deletes the given Delegation token node
+ * @param delegationTokenId
+ * @return delete status
+ */
+ def deleteDelegationToken(delegationTokenId: String): Boolean = {
+ deleteRecursive(DelegationTokenInfoZNode.path(delegationTokenId))
+ }
+
+ /**
* This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data
* watcher registrations on paths which might not even exist.
*
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 2c86c2c..99fe591 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -23,13 +23,14 @@ import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.common.KafkaException
import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
-import kafka.security.auth.{Acl, Resource}
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.server.ConfigType
+import kafka.security.auth.{Acl, Resource}
+import kafka.server.{ConfigType, DelegationTokenManager}
import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.data.{ACL, Stat}
@@ -490,6 +491,32 @@ object ProducerIdBlockZNode {
def path = "/latest_producer_id_block"
}
+object DelegationTokenAuthZNode {
+ def path = "/delegation_token"
+}
+
+object DelegationTokenChangeNotificationZNode {
+ def path = s"${DelegationTokenAuthZNode.path}/token_changes"
+}
+
+object DelegationTokenChangeNotificationSequenceZNode {
+ val SequenceNumberPrefix = "token_change_"
+ def createPath = s"${DelegationTokenChangeNotificationZNode.path}/$SequenceNumberPrefix"
+ def deletePath(sequenceNode: String) = s"${DelegationTokenChangeNotificationZNode.path}/${sequenceNode}"
+ def encode(tokenId : String): Array[Byte] = tokenId.getBytes(UTF_8)
+ def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+}
+
+object DelegationTokensZNode {
+ def path = s"${DelegationTokenAuthZNode.path}/tokens"
+}
+
+object DelegationTokenInfoZNode {
+ def path(tokenId: String) = s"${DelegationTokensZNode.path}/$tokenId"
+ def encode(token: DelegationToken): Array[Byte] = Json.encodeAsBytes(DelegationTokenManager.toJsonCompatibleMap(token).asJava)
+ def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes)
+}
+
object ZkData {
// Important: it is necessary to add any new top level Zookeeper path to the Seq
@@ -503,11 +530,12 @@ object ZkData {
AclZNode.path,
AclChangeNotificationZNode.path,
ProducerIdBlockZNode.path,
- LogDirEventNotificationZNode.path)
+ LogDirEventNotificationZNode.path,
+ DelegationTokenAuthZNode.path)
// These are persistent ZK paths that should exist on kafka broker startup.
val PersistentZkPaths = Seq(
- "/consumers", // old consumer path
+ "/consumers", // old consumer path
BrokerIdsZNode.path,
TopicsZNode.path,
ConfigEntityChangeNotificationZNode.path,
@@ -518,7 +546,7 @@ object ZkData {
LogDirEventNotificationZNode.path
) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)
- val SensitiveRootPaths = Seq(ConfigEntityTypeZNode.path(ConfigType.User))
+ val SensitiveRootPaths = Seq(ConfigEntityTypeZNode.path(ConfigType.User), DelegationTokensZNode.path)
def sensitivePath(path: String): Boolean = {
path != null && SensitiveRootPaths.exists(path.startsWith)
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..efc5049
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util
+
+import kafka.admin.AdminClient
+import kafka.server.KafkaConfig
+import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramMechanism
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+import org.junit.Before
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+
+ val kafkaClientSaslMechanism = "SCRAM-SHA-256"
+ val kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
+ override protected def securityProtocol = SecurityProtocol.SASL_SSL
+ override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+ override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+ override val clientPrincipal = JaasTestUtils.KafkaScramUser
+ private val clientPassword = JaasTestUtils.KafkaScramPassword
+
+ override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin
+ private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
+
+ this.serverConfig.setProperty(KafkaConfig.DelegationTokenMasterKeyProp, "testKey")
+
+ override def configureSecurityBeforeServersStart() {
+ super.configureSecurityBeforeServersStart()
+ zkClient.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
+ // Create broker admin credentials before starting brokers
+ createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
+ }
+
+ override def configureSecurityAfterServersStart() {
+ super.configureSecurityAfterServersStart()
+
+ // create scram credential for user "scram-user"
+ createScramCredentials(zkConnect, clientPrincipal, clientPassword)
+
+ //create a token with "scram-user" credentials
+ val token = createDelegationToken()
+
+ // pass token to client jaas config
+ val clientLoginContext = JaasTestUtils.tokenClientLoginModule(token.tokenInfo().tokenId(), token.hmacAsBase64String())
+ producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+ consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+ }
+
+ @Before
+ override def setUp() {
+ startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
+ super.setUp()
+ }
+
+ private def createDelegationToken(): DelegationToken = {
+ val config = new util.HashMap[String, Object]
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
+ config.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+
+ val adminClient = AdminClient.create(config.asScala.toMap)
+ var (error, token) = adminClient.createToken(List())
+
+ //wait for token to reach all the brokers
+ TestUtils.waitUntilTrue(() => servers.forall(server => !server.tokenCache.tokens().isEmpty),
+ "Timed out waiting for token to propagate to all servers")
+ adminClient.close()
+
+ token
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 7f71faf..9b62728 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -38,12 +38,14 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2"))
private val TransactionalIdResources = Set(new Resource(TransactionalId, "t0"), new Resource(TransactionalId, "t1"))
+ private val TokenResources = Set(new Resource(DelegationToken, "token1"), new Resource(DelegationToken, "token2"))
private val ResourceToCommand = Map[Set[Resource], Array[String]](
TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
Set(Resource.ClusterResource) -> Array("--cluster"),
GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2"),
- TransactionalIdResources -> Array("--transactional-id", "t0", "--transactional-id", "t1")
+ TransactionalIdResources -> Array("--transactional-id", "t0", "--transactional-id", "t1"),
+ TokenResources -> Array("--delegation-token", "token1", "--delegation-token", "token2")
)
private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
@@ -54,7 +56,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs",
"--operation", "AlterConfigs", "--operation", "IdempotentWrite")),
GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe")),
- TransactionalIdResources -> (Set(Describe, Write), Array("--operation", "Describe", "--operation", "Write"))
+ TransactionalIdResources -> (Set(Describe, Write), Array("--operation", "Describe", "--operation", "Write")),
+ TokenResources -> (Set(Describe), Array("--operation", "Describe"))
)
private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]](
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 9c09a43..662d6d2 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -61,6 +61,12 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
*/
def configureSecurityBeforeServersStart() {}
+ /**
+ * Override this in case Tokens or security credentials needs to be created after `servers` are started.
+ * The default implementation of this method is a no-op.
+ */
+ def configureSecurityAfterServersStart() {}
+
def configs: Seq[KafkaConfig] = {
if (instanceConfigs == null)
instanceConfigs = generateConfigs
@@ -94,6 +100,9 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
brokerList = TestUtils.bootstrapServers(servers, listenerName)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)
+
+ // default implementation is a no-op, it is overridden by subclasses if required
+ configureSecurityAfterServersStart()
}
@After
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index f5c9f03..13299c7 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -61,7 +61,7 @@ class SocketServerTest extends JUnitSuite {
props.put("connections.max.idle.ms", "60000")
val config = KafkaConfig.fromProps(props)
val metrics = new Metrics
- val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
+ val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, null)
val localAddress = InetAddress.getLoopbackAddress
// Clean-up any metrics left around by previous tests
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
new file mode 100644
index 0000000..8c03548
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.token.delegation
+
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.util.Properties
+
+import kafka.network.RequestChannel.Session
+import kafka.security.auth.Acl.WildCardHost
+import kafka.security.auth._
+import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.scram.ScramMechanism
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenManagerTest extends ZooKeeperTestHarness {
+
+ val time = new MockTime()
+ val owner = SecurityUtils.parseKafkaPrincipal("User:owner")
+ val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer1"))
+
+ val masterKey = "masterKey"
+ val maxLifeTimeMsDefault = Defaults.DelegationTokenMaxLifeTimeMsDefault
+ val renewTimeMsDefault = Defaults.DelegationTokenExpiryTimeMsDefault
+ var tokenCache: DelegationTokenCache = null
+ var props: Properties = null
+
+ var createTokenResult: CreateTokenResult = _
+ var error: Errors = Errors.NONE
+ var expiryTimeStamp: Long = 0
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ props = TestUtils.createBrokerConfig(0, zkConnect, enableToken = true)
+ props.put(KafkaConfig.SaslEnabledMechanismsProp, ScramMechanism.mechanismNames().asScala.mkString(","))
+ props.put(KafkaConfig.DelegationTokenMasterKeyProp, masterKey)
+ tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames())
+ }
+
+ @Test
+ def testTokenRequestsWithDelegationTokenDisabled(): Unit = {
+ val props: Properties = TestUtils.createBrokerConfig(0, zkConnect)
+ val config = KafkaConfig.fromProps(props)
+ val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+
+ tokenManager.createToken(owner, renewer, -1, createTokenResultCallBack)
+ assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createTokenResult.error)
+ assert(Array[Byte]() sameElements createTokenResult.hmac)
+
+ tokenManager.renewToken(owner, ByteBuffer.wrap("test".getBytes), 1000000, renewResponseCallback)
+ assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, error)
+
+ tokenManager.expireToken(owner, ByteBuffer.wrap("test".getBytes), 1000000, renewResponseCallback)
+ assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, error)
+ }
+
+ @Test
+ def testCreateToken(): Unit = {
+ val config = KafkaConfig.fromProps(props)
+ val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ tokenManager.startup
+
+ tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
+ val issueTime = time.milliseconds
+ val tokenId = createTokenResult.tokenId
+ val password = DelegationTokenManager.createHmac(tokenId, masterKey)
+ assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault, issueTime + maxLifeTimeMsDefault, tokenId, password, Errors.NONE), createTokenResult)
+
+ val token = tokenManager.getToken(tokenId)
+ assertTrue(!token.isEmpty )
+ assertTrue(password sameElements token.get.hmac)
+ }
+
+ @Test
+ def testRenewToken(): Unit = {
+ val config = KafkaConfig.fromProps(props)
+ val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ tokenManager.startup
+
+ tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
+ val issueTime = time.milliseconds
+ val maxLifeTime = issueTime + maxLifeTimeMsDefault
+ val tokenId = createTokenResult.tokenId
+ val password = DelegationTokenManager.createHmac(tokenId, masterKey)
+ assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault, maxLifeTime, tokenId, password, Errors.NONE), createTokenResult)
+
+ //try renewing non-existing token
+ tokenManager.renewToken(owner, ByteBuffer.wrap("test".getBytes), -1 , renewResponseCallback)
+ assertEquals(Errors.DELEGATION_TOKEN_NOT_FOUND, error)
+
+ //try renew non-owned tokens
+ val unknownOwner = SecurityUtils.parseKafkaPrincipal("User:Unknown")
+ tokenManager.renewToken(unknownOwner, ByteBuffer.wrap(password), -1 , renewResponseCallback)
+ assertEquals(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, error)
+
+ // try renew with default time period
+ time.sleep(24 * 60 * 60 * 1000L)
+ var expectedExpiryStamp = time.milliseconds + renewTimeMsDefault
+ tokenManager.renewToken(owner, ByteBuffer.wrap(password), -1 , renewResponseCallback)
+ assertEquals(expectedExpiryStamp, expiryTimeStamp)
+ assertEquals(Errors.NONE, error)
+
+ // try renew with specific time period
+ time.sleep(24 * 60 * 60 * 1000L)
+ expectedExpiryStamp = time.milliseconds + 1 * 60 * 60 * 1000L
+ tokenManager.renewToken(owner, ByteBuffer.wrap(password), 1 * 60 * 60 * 1000L , renewResponseCallback)
+ assertEquals(expectedExpiryStamp, expiryTimeStamp)
+ assertEquals(Errors.NONE, error)
+
+ //try renewing more than max time period
+ time.sleep( 1 * 60 * 60 * 1000L)
+ tokenManager.renewToken(owner, ByteBuffer.wrap(password), 8 * 24 * 60 * 60 * 1000L, renewResponseCallback)
+ assertEquals(maxLifeTime, expiryTimeStamp)
+ assertEquals(Errors.NONE, error)
+
+ //try renewing expired token
+ time.sleep(8 * 24 * 60 * 60 * 1000L)
+ tokenManager.renewToken(owner, ByteBuffer.wrap(password), -1 , renewResponseCallback)
+ assertEquals(Errors.DELEGATION_TOKEN_EXPIRED, error)
+ }
+
+ @Test
+ def testExpireToken(): Unit = {
+ val config = KafkaConfig.fromProps(props)
+ val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ tokenManager.startup
+
+ tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
+ val issueTime = time.milliseconds
+ val tokenId = createTokenResult.tokenId
+ val password = DelegationTokenManager.createHmac(tokenId, masterKey)
+ assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault, issueTime + maxLifeTimeMsDefault, tokenId, password, Errors.NONE), createTokenResult)
+
+ //try expire non-existing token
+ tokenManager.expireToken(owner, ByteBuffer.wrap("test".getBytes), -1 , renewResponseCallback)
+ assertEquals(Errors.DELEGATION_TOKEN_NOT_FOUND, error)
+
+ //try expire non-owned tokens
+ val unknownOwner = SecurityUtils.parseKafkaPrincipal("User:Unknown")
+ tokenManager.expireToken(unknownOwner, ByteBuffer.wrap(password), -1 , renewResponseCallback)
+ assertEquals(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, error)
+
+ //try expire token at a timestamp
+ time.sleep(24 * 60 * 60 * 1000L)
+ val expectedExpiryStamp = time.milliseconds + 2 * 60 * 60 * 1000L
+ tokenManager.expireToken(owner, ByteBuffer.wrap(password), 2 * 60 * 60 * 1000L, renewResponseCallback)
+ assertEquals(expectedExpiryStamp, expiryTimeStamp)
+
+ //try expire token immediately
+ time.sleep(1 * 60 * 60 * 1000L)
+ tokenManager.expireToken(owner, ByteBuffer.wrap(password), -1, renewResponseCallback)
+ assert(tokenManager.getToken(tokenId).isEmpty)
+ assertEquals(Errors.NONE, error)
+ assertEquals(time.milliseconds, expiryTimeStamp)
+ }
+
+ @Test
+ def testDescribeToken(): Unit = {
+
+ val config = KafkaConfig.fromProps(props)
+
+ val owner1 = SecurityUtils.parseKafkaPrincipal("User:owner1")
+ val owner2 = SecurityUtils.parseKafkaPrincipal("User:owner2")
+ val owner3 = SecurityUtils.parseKafkaPrincipal("User:owner3")
+ val owner4 = SecurityUtils.parseKafkaPrincipal("User:owner4")
+
+ val renewer1 = SecurityUtils.parseKafkaPrincipal("User:renewer1")
+ val renewer2 = SecurityUtils.parseKafkaPrincipal("User:renewer2")
+ val renewer3 = SecurityUtils.parseKafkaPrincipal("User:renewer3")
+ val renewer4 = SecurityUtils.parseKafkaPrincipal("User:renewer4")
+
+ val simpleAclAuthorizer = new SimpleAclAuthorizer
+ simpleAclAuthorizer.configure(config.originals)
+
+ var hostSession = new Session(owner1, InetAddress.getByName("192.168.1.1"))
+
+ val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ tokenManager.startup
+
+ //create tokens
+ tokenManager.createToken(owner1, List(renewer1, renewer2), 1 * 60 * 60 * 1000L, createTokenResultCallBack)
+
+ tokenManager.createToken(owner2, List(renewer3), 1 * 60 * 60 * 1000L, createTokenResultCallBack)
+ val tokenId2 = createTokenResult.tokenId
+
+ tokenManager.createToken(owner3, List(renewer4), 2 * 60 * 60 * 1000L, createTokenResultCallBack)
+ val tokenId3 = createTokenResult.tokenId
+
+ tokenManager.createToken(owner4, List(owner1, renewer4), 2 * 60 * 60 * 1000L, createTokenResultCallBack)
+
+ assert(tokenManager.getAllTokenInformation().size == 4 )
+
+ //get tokens non-exiting owner
+ var tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(SecurityUtils.parseKafkaPrincipal("User:unknown")))
+ assert(tokens.size == 0)
+
+ //get all tokens for empty owner list
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List())
+ assert(tokens.size == 0)
+
+ //get all tokens for owner1
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1))
+ assert(tokens.size == 2)
+
+ //get all tokens for owner1
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, null)
+ assert(tokens.size == 2)
+
+ //get all tokens for unknown owner
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, SecurityUtils.parseKafkaPrincipal("User:unknown"), null)
+ assert(tokens.size == 0)
+
+ //get all tokens for multiple owners (owner1, renewer4) and without permission for renewer4
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1, renewer4))
+ assert(tokens.size == 2)
+
+ //get all tokens for multiple owners (owner1, renewer4) and with permission
+ var acl = new Acl(owner1, Allow, WildCardHost, Describe)
+ simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId3))
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1, renewer4))
+ assert(tokens.size == 3)
+
+ //get all tokens for renewer4 which is a renewer principal for some tokens
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, renewer4, List(renewer4))
+ assert(tokens.size == 2)
+
+ //get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and without permissions for renewer3
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, renewer2, List(renewer2, renewer3))
+ assert(tokens.size == 1)
+
+ //get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
+ hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
+ acl = new Acl(renewer2, Allow, WildCardHost, Describe)
+ simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId2))
+ tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, renewer2, List(renewer2, renewer3))
+ assert(tokens.size == 2)
+
+ simpleAclAuthorizer.close()
+ }
+
+ private def getTokens(tokenManager: DelegationTokenManager, simpleAclAuthorizer: SimpleAclAuthorizer, hostSession: Session,
+ requestPrincipal: KafkaPrincipal, requestedOwners: List[KafkaPrincipal]): List[DelegationToken] = {
+
+ if (requestedOwners != null && requestedOwners.isEmpty) {
+ List()
+ }
+ else {
+ def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId))
+ def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, Option(requestedOwners), token, authorizeToken)
+ tokenManager.getTokens(eligible)
+ }
+ }
+
+ @Test
+ def testPeriodicTokenExpiry(): Unit = {
+ val config = KafkaConfig.fromProps(props)
+ val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ tokenManager.startup
+
+ //create tokens
+ tokenManager.createToken(owner, renewer, 1 * 60 * 60 * 1000L, createTokenResultCallBack)
+ tokenManager.createToken(owner, renewer, 1 * 60 * 60 * 1000L, createTokenResultCallBack)
+ tokenManager.createToken(owner, renewer, 2 * 60 * 60 * 1000L, createTokenResultCallBack)
+ tokenManager.createToken(owner, renewer, 2 * 60 * 60 * 1000L, createTokenResultCallBack)
+ assert(tokenManager.getAllTokenInformation().size == 4 )
+
+ time.sleep(2 * 60 * 60 * 1000L)
+ tokenManager.expireTokens()
+ assert(tokenManager.getAllTokenInformation().size == 2 )
+
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ super.tearDown()
+ }
+
+ private def createTokenResultCallBack(ret: CreateTokenResult): Unit = {
+ createTokenResult = ret
+ }
+
+ private def renewResponseCallback(ret: Errors, timeStamp: Long): Unit = {
+ error = ret
+ expiryTimeStamp = timeStamp
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
new file mode 100644
index 0000000..4c42dd2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.nio.ByteBuffer
+import java.util
+
+import kafka.admin.AdminClient
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.protocol.Errors
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
+ var adminClient: AdminClient = null
+
+ override def numBrokers = 1
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp()
+ }
+
+ def createAdminConfig():util.Map[String, Object] = {
+ val config = new util.HashMap[String, Object]
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ config
+ }
+
+ @Test
+ def testDelegationTokenRequests(): Unit = {
+ adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+
+ val createResponse = adminClient.createToken(List())
+ assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, createResponse._1)
+
+ val describeResponse = adminClient.describeToken(List())
+ assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, describeResponse._1)
+
+ //test renewing tokens
+ val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
+ assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, renewResponse._1)
+
+ //test expire tokens tokens
+ val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
+ assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, expireResponse._1)
+ }
+
+
+ @After
+ override def tearDown(): Unit = {
+ if (adminClient != null)
+ adminClient.close()
+ super.tearDown()
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
new file mode 100644
index 0000000..55bf5fd
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+
+import kafka.admin.AdminClient
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.SecurityUtils
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
+ override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ private val kafkaClientSaslMechanism = "PLAIN"
+ private val kafkaServerSaslMechanisms = List("PLAIN")
+ protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+ protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+ var adminClient: AdminClient = null
+
+ override def numBrokers = 1
+
+ @Before
+ override def setUp(): Unit = {
+ startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+ super.setUp()
+ }
+
+ def createAdminConfig():util.Map[String, Object] = {
+ val config = new util.HashMap[String, Object]
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ config
+ }
+
+ override def generateConfigs = {
+ val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+ enableControlledShutdown = false, enableDeleteTopic = true,
+ interBrokerSecurityProtocol = Some(securityProtocol),
+ trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
+ props.foreach(propertyOverrides)
+ props.map(KafkaConfig.fromProps)
+ }
+
+ @Test
+ def testDelegationTokenRequests(): Unit = {
+ adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+
+ // test creating token
+ val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
+ val tokenResult1 = adminClient.createToken(renewer1)
+ assertEquals(Errors.NONE, tokenResult1._1)
+ var token1 = adminClient.describeToken(null)._2.head
+ assertEquals(token1, tokenResult1._2)
+
+ //test renewing tokens
+ val renewResponse = adminClient.renewToken(token1.hmacBuffer())
+ assertEquals(Errors.NONE, renewResponse._1)
+
+ token1 = adminClient.describeToken(null)._2.head
+ assertEquals(renewResponse._2, token1.tokenInfo().expiryTimestamp())
+
+ //test describe tokens
+ val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:Renewer1"))
+ val tokenResult2 = adminClient.createToken(renewer2)
+ assertEquals(Errors.NONE, tokenResult2._1)
+ val token2 = tokenResult2._2
+
+ assertTrue(adminClient.describeToken(null)._2.size == 2)
+
+ //test expire tokens tokens
+ val expireResponse1 = adminClient.expireToken(token1.hmacBuffer())
+ assertEquals(Errors.NONE, expireResponse1._1)
+
+ val expireResponse2 = adminClient.expireToken(token2.hmacBuffer())
+ assertEquals(Errors.NONE, expireResponse2._1)
+
+ assertTrue(adminClient.describeToken(null)._2.size == 0)
+
+ //create token with invalid principal type
+ val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer1"))
+ val tokenResult3 = adminClient.createToken(renewer3)
+ assertEquals(Errors.INVALID_PRINCIPAL_TYPE, tokenResult3._1)
+
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ if (adminClient != null)
+ adminClient.close()
+ super.tearDown()
+ closeSasl()
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
new file mode 100644
index 0000000..0561cac
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.nio.ByteBuffer
+import java.util
+
+import kafka.admin.AdminClient
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.SecurityUtils
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+
+import scala.collection.JavaConverters._
+
+class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup {
+ override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ private val kafkaClientSaslMechanism = "PLAIN"
+ private val kafkaServerSaslMechanisms = List("PLAIN")
+ protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+ protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+ var adminClient: AdminClient = null
+
+ override def numBrokers = 1
+
+ @Before
+ override def setUp(): Unit = {
+ startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+ super.setUp()
+ }
+
+ def createAdminConfig():util.Map[String, Object] = {
+ val config = new util.HashMap[String, Object]
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ config
+ }
+
+ @Test
+ def testDelegationTokenRequests(): Unit = {
+ adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+
+ val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
+ val createResponse = adminClient.createToken(renewer1)
+ assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createResponse._1)
+
+ val describeResponse = adminClient.describeToken(List())
+ assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, describeResponse._1)
+
+ //test renewing tokens
+ val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
+ assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, renewResponse._1)
+
+ //test expire tokens tokens
+ val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
+ assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, expireResponse._1)
+
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ if (adminClient != null)
+ adminClient.close()
+ super.tearDown()
+ closeSasl()
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a85a10b..8e907d9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -98,7 +98,8 @@ class KafkaApisTest {
quotas,
brokerTopicStats,
clusterId,
- time
+ time,
+ null
)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9c459d8..748efec 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -674,6 +674,12 @@ class KafkaConfigTest {
case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
+
+ //delegation token configs
+ case KafkaConfig.DelegationTokenMasterKeyProp => // ignore
+ case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
@@ -718,6 +724,14 @@ class KafkaConfigTest {
assertEquals(123L, config.logFlushIntervalMs)
assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
+ assertEquals(false, config.tokenAuthEnabled)
+ assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs)
+ assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
+ assertEquals(1 * 60L * 1000L * 60, config.delegationTokenExpiryCheckIntervalMs)
+
+ defaults.put(KafkaConfig.DelegationTokenMasterKeyProp, "1234567890")
+ val config1 = KafkaConfig.fromProps(defaults)
+ assertEquals(true, config1.tokenAuthEnabled)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 886c318..a90fa64 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
import org.apache.kafka.common.utils.Sanitizer
+import org.apache.kafka.common.utils.SecurityUtils
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -305,6 +306,18 @@ class RequestQuotaTest extends BaseRequestTest {
Collections.singletonMap("topic-2", NewPartitions.increaseTo(1)), 0, false
)
+ case ApiKeys.CREATE_DELEGATION_TOKEN =>
+ new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000)
+
+ case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
+ new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+
+ case ApiKeys.DESCRIBE_DELEGATION_TOKEN=>
+ new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
+
+ case ApiKeys.RENEW_DELEGATION_TOKEN=>
+ new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
@@ -399,6 +412,10 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.ALTER_REPLICA_LOG_DIRS => new AlterReplicaLogDirsResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs
case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
+ case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response).throttleTimeMs
+ case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
+ case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
+ case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
}
}
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 1f9ccf3..9ce3b01 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -72,14 +72,15 @@ object JaasTestUtils {
case class ScramLoginModule(username: String,
password: String,
- debug: Boolean = false) extends JaasModule {
+ debug: Boolean = false,
+ tokenProps: Map[String, String] = Map.empty) extends JaasModule {
def name = "org.apache.kafka.common.security.scram.ScramLoginModule"
def entries: Map[String, String] = Map(
"username" -> username,
"password" -> password
- )
+ ) ++ tokenProps.map { case (name, value) => name -> value }
}
sealed trait JaasModule {
@@ -157,6 +158,16 @@ object JaasTestUtils {
def clientLoginModule(mechanism: String, keytabLocation: Option[File]): String =
kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString
+ def tokenClientLoginModule(tokenId: String, password: String): String = {
+ ScramLoginModule(
+ tokenId,
+ password,
+ debug = false,
+ Map(
+ "tokenauth" -> "true"
+ )).toString
+ }
+
def zkSections: Seq[JaasSection] = Seq(
JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false,
Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2c2d9dd..09e4c94 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -158,11 +158,12 @@ object TestUtils extends Logging {
enableSaslPlaintext: Boolean = false,
enableSaslSsl: Boolean = false,
rackInfo: Map[Int, String] = Map(),
- logDirCount: Int = 1): Seq[Properties] = {
+ logDirCount: Int = 1,
+ enableToken: Boolean = false): Seq[Properties] = {
(0 until numConfigs).map { node =>
createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort,
interBrokerSecurityProtocol, trustStoreFile, saslProperties, enablePlaintext = enablePlaintext, enableSsl = enableSsl,
- enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node), logDirCount = logDirCount)
+ enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node), logDirCount = logDirCount, enableToken = enableToken)
}
}
@@ -213,7 +214,8 @@ object TestUtils extends Logging {
enableSaslSsl: Boolean = false,
saslSslPort: Int = RandomPort,
rack: Option[String] = None,
- logDirCount: Int = 1): Properties = {
+ logDirCount: Int = 1,
+ enableToken: Boolean = false): Properties = {
def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol)
val protocolAndPorts = ArrayBuffer[(SecurityProtocol, Int)]()
@@ -270,6 +272,9 @@ object TestUtils extends Logging {
props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name)
}
+ if (enableToken)
+ props.put(KafkaConfig.DelegationTokenMasterKeyProp, "masterkey")
+
props
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index f3b8e81..d3726c2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -28,10 +28,15 @@ import kafka.utils.CoreUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.token.delegation.TokenInformation
+import org.apache.kafka.common.utils.SecurityUtils
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.junit.Assert._
import org.junit.Test
+import scala.collection.JavaConverters._
+import scala.util.Random
+
class KafkaZkClientTest extends ZooKeeperTestHarness {
private val group = "my-group"
@@ -472,4 +477,39 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
data.map(new String(_, UTF_8))
}
+ @Test
+ def testDelegationTokenMethods() {
+ assertFalse(zkClient.pathExists(DelegationTokensZNode.path))
+ assertFalse(zkClient.pathExists(DelegationTokenChangeNotificationZNode.path))
+
+ zkClient.createDelegationTokenPaths
+ assertTrue(zkClient.pathExists(DelegationTokensZNode.path))
+ assertTrue(zkClient.pathExists(DelegationTokenChangeNotificationZNode.path))
+
+ val tokenId = "token1"
+ val owner = SecurityUtils.parseKafkaPrincipal("User:owner1")
+ val renewers = List(SecurityUtils.parseKafkaPrincipal("User:renewer1"), SecurityUtils.parseKafkaPrincipal("User:renewer1"))
+
+ val tokenInfo = new TokenInformation(tokenId, owner, renewers.asJava,
+ System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis())
+ val bytes = new Array[Byte](20)
+ Random.nextBytes(bytes)
+ val token = new org.apache.kafka.common.security.token.delegation.DelegationToken(tokenInfo, bytes)
+
+ // test non-existent token
+ assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+
+ // create a token
+ zkClient.setOrCreateDelegationToken(token)
+
+ //get created token
+ assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+ //update expiryTime
+ tokenInfo.setExpiryTimestamp(System.currentTimeMillis())
+ zkClient.setOrCreateDelegationToken(token)
+
+ //test updated token
+ assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].