You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/04/11 17:48:09 UTC
[kafka] branch trunk updated: KAFKA-6447: Add Delegation Token
Operations to KafkaAdminClient (KIP-249) (#4427)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 47918f2 KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427)
47918f2 is described below
commit 47918f2d79e907f6a6da599ab82a97c169722229
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Wed Apr 11 23:18:04 2018 +0530
KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427)
Reviewers: Jun Rao <ju...@gmail.com>
---
build.gradle | 1 +
checkstyle/suppressions.xml | 2 +-
.../apache/kafka/clients/admin/AdminClient.java | 154 +++++++++++++++++++++
.../admin/CreateDelegationTokenOptions.java | 53 +++++++
.../clients/admin/CreateDelegationTokenResult.java | 43 ++++++
.../admin/DescribeDelegationTokenOptions.java | 48 +++++++
.../admin/DescribeDelegationTokenResult.java | 45 ++++++
.../admin/ExpireDelegationTokenOptions.java} | 26 ++--
.../admin/ExpireDelegationTokenResult.java} | 29 ++--
.../kafka/clients/admin/KafkaAdminClient.java | 137 ++++++++++++++++++
.../admin/RenewDelegationTokenOptions.java} | 26 ++--
.../admin/RenewDelegationTokenResult.java} | 29 ++--
.../kafka/common/network/ChannelBuilders.java | 2 +-
.../kafka/common/network/SaslChannelBuilder.java | 2 +-
.../requests/DescribeDelegationTokenResponse.java | 4 +
.../requests/ExpireDelegationTokenRequest.java | 4 +-
.../requests/ExpireDelegationTokenResponse.java | 4 +
.../requests/RenewDelegationTokenRequest.java | 4 +-
.../requests/RenewDelegationTokenResponse.java | 4 +
.../security/scram/internal/ScramSaslServer.java | 2 +-
.../scram/internal/ScramServerCallbackHandler.java | 4 +-
.../security/token/delegation/DelegationToken.java | 11 +-
.../token/delegation/TokenInformation.java | 6 +
.../{ => internal}/DelegationTokenCache.java | 4 +-
.../DelegationTokenCredentialCallback.java | 2 +-
.../kafka/clients/admin/MockAdminClient.java | 20 +++
.../apache/kafka/common/network/NioEchoServer.java | 2 +-
.../kafka/common/requests/RequestResponseTest.java | 4 +-
.../scram/internal/ScramSaslServerTest.java | 2 +-
core/src/main/scala/kafka/admin/AdminClient.scala | 29 ----
.../scala/kafka/admin/DelegationTokenCommand.scala | 88 ++++++------
.../scala/kafka/security/CredentialProvider.scala | 2 +-
.../kafka/server/DelegationTokenManager.scala | 3 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../DelegationTokenEndToEndAuthorizationTest.scala | 8 +-
.../kafka/admin/DelegationTokenCommandTest.scala | 147 ++++++++++++++++++++
.../delegation/DelegationTokenManagerTest.scala | 3 +-
.../DelegationTokenRequestsOnPlainTextTest.scala | 27 ++--
.../kafka/server/DelegationTokenRequestsTest.scala | 102 ++++++++------
...nTokenRequestsWithDisableTokenFeatureTest.scala | 32 ++---
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +-
41 files changed, 907 insertions(+), 214 deletions(-)
diff --git a/build.gradle b/build.gradle
index f836980..69f560e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -858,6 +858,7 @@ project(':clients') {
include "**/org/apache/kafka/common/config/*"
include "**/org/apache/kafka/common/security/auth/*"
include "**/org/apache/kafka/server/policy/*"
+ include "**/org/apache/kafka/common/security/token/delegation/*"
}
}
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0fec810..2767132 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -10,7 +10,7 @@
<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
- files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
+ files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 897e127..53b77ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -535,4 +535,158 @@ public abstract class AdminClient implements AutoCloseable {
*/
public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
DeleteRecordsOptions options);
+
+ /**
+ * <p>Create a Delegation Token.</p>
+ *
+ * <p>This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
+ * See the overload for more details.</p>
+ *
+ * @return The CreateDelegationTokenResult.
+ */
+ public CreateDelegationTokenResult createDelegationToken() {
+ return createDelegationToken(new CreateDelegationTokenOptions());
+ }
+
+
+ /**
+ * <p>Create a Delegation Token.</p>
+ *
+ * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult}</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
+ * if the renewers principal type is not supported.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
+ * </ul>
+ *
+ * @param options The options to use when creating delegation token.
+ * @return The DeleteRecordsResult.
+ */
+ public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
+
+
+ /**
+ * <p>Renew a Delegation Token.</p>
+ *
+ * <p>This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
+ * See the overload for more details.</p>
+ *
+ *
+ * @param hmac HMAC of the Delegation token
+ * @return The RenewDelegationTokenResult.
+ */
+ public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
+ return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
+ }
+
+ /**
+ * <p> Renew a Delegation Token.</p>
+ *
+ * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult}</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+ * if the delegation token is not found on server.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+ * if the authenticated user is not owner/renewer of the token.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+ * if the delegation token is expired.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
+ * </ul>
+ *
+ * @param hmac HMAC of the Delegation token
+ * @param options The options to use when renewing delegation token.
+ * @return The RenewDelegationTokenResult.
+ */
+ public abstract RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
+
+ /**
+ * <p>Expire a Delegation Token.</p>
+ *
+ * <p>This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
+ * This will expire the token immediately. See the overload for more details.</p>
+ *
+ * @param hmac HMAC of the Delegation token
+ * @return The ExpireDelegationTokenResult.
+ */
+ public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
+ return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
+ }
+
+ /**
+ * <p>Expire a Delegation Token.</p>
+ *
+ * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code ExpireDelegationTokenResult}</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+ * if the delegation token is not found on server.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+ * if the authenticated user is not owner/renewer of the requested token.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+ * if the delegation token is expired.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
+ * </ul>
+ *
+ * @param hmac HMAC of the Delegation token
+ * @param options The options to use when expiring delegation token.
+ * @return The ExpireDelegationTokenResult.
+ */
+ public abstract ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
+
+ /**
+ *<p>Describe the Delegation Tokens.</p>
+ *
+ * <p>This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
+ * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.</p>
+ *
+ * @return The DescribeDelegationTokenResult.
+ */
+ public DescribeDelegationTokenResult describeDelegationToken() {
+ return describeDelegationToken(new DescribeDelegationTokenOptions());
+ }
+
+ /**
+ * <p>Describe the Delegation Tokens.</p>
+ *
+ * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@code DescribeDelegationTokenResult}</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
+ * </ul>
+ *
+ * @param options The options to use when describing delegation tokens.
+ * @return The DescribeDelegationTokenResult.
+ */
+ public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
new file mode 100644
index 0000000..1b77b94
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+/**
+ * Options for {@link AdminClient#createDelegationToken(CreateDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
+ private long maxLifeTimeMs = -1;
+ private List<KafkaPrincipal> renewers = new LinkedList<>();
+
+ public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> renewers) {
+ this.renewers = renewers;
+ return this;
+ }
+
+ public List<KafkaPrincipal> renewers() {
+ return renewers;
+ }
+
+ public CreateDelegationTokenOptions maxlifeTimeMs(long maxLifeTimeMs) {
+ this.maxLifeTimeMs = maxLifeTimeMs;
+ return this;
+ }
+
+ public long maxlifeTimeMs() {
+ return maxLifeTimeMs;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
new file mode 100644
index 0000000..043cbe8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+
+/**
+ * The result of the {@link KafkaAdminClient#createDelegationToken(CreateDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class CreateDelegationTokenResult {
+ private final KafkaFuture<DelegationToken> delegationToken;
+
+ CreateDelegationTokenResult(KafkaFuture<DelegationToken> delegationToken) {
+ this.delegationToken = delegationToken;
+ }
+
+ /**
+ * Returns a future which yields a delegation token
+ */
+ public KafkaFuture<DelegationToken> delegationToken() {
+ return delegationToken;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
new file mode 100644
index 0000000..60b9935
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+/**
+ * Options for {@link AdminClient#describeDelegationToken(DescribeDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDelegationTokenOptions> {
+ private List<KafkaPrincipal> owners;
+
+ /**
+ * if owners is null, all the user owned tokens and tokens where user have Describe permission
+ * will be returned.
+ * @param owners
+ * @return this instance
+ */
+ public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners) {
+ this.owners = owners;
+ return this;
+ }
+
+ public List<KafkaPrincipal> owners() {
+ return owners;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
new file mode 100644
index 0000000..7a9d4b9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+
+/**
+ * The result of the {@link KafkaAdminClient#describeDelegationToken(DescribeDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeDelegationTokenResult {
+ private final KafkaFuture<List<DelegationToken>> delegationTokens;
+
+ DescribeDelegationTokenResult(KafkaFuture<List<DelegationToken>> delegationTokens) {
+ this.delegationTokens = delegationTokens;
+ }
+
+ /**
+ * Returns a future which yields list of delegation tokens
+ */
+ public KafkaFuture<List<DelegationToken>> delegationTokens() {
+ return delegationTokens;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
similarity index 54%
copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
index 7490a3e..138cd4e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
@@ -14,18 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.token.delegation;
-import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+package org.apache.kafka.clients.admin;
-public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
- private String tokenOwner;
+import org.apache.kafka.common.annotation.InterfaceStability;
- public void tokenOwner(String tokenOwner) {
- this.tokenOwner = tokenOwner;
+/**
+ * Options for {@link AdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ExpireDelegationTokenOptions extends AbstractOptions<ExpireDelegationTokenOptions> {
+ private long expiryTimePeriodMs = -1L;
+
+ public ExpireDelegationTokenOptions expiryTimePeriodMs(long expiryTimePeriodMs) {
+ this.expiryTimePeriodMs = expiryTimePeriodMs;
+ return this;
}
- public String tokenOwner() {
- return tokenOwner;
+ public long expiryTimePeriodMs() {
+ return expiryTimePeriodMs;
}
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
similarity index 52%
copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
index 7490a3e..41782bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
@@ -14,18 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.token.delegation;
-import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+package org.apache.kafka.clients.admin;
-public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
- private String tokenOwner;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
- public void tokenOwner(String tokenOwner) {
- this.tokenOwner = tokenOwner;
+/**
+ * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ExpireDelegationTokenResult {
+ private final KafkaFuture<Long> expiryTimestamp;
+
+ ExpireDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
+ this.expiryTimestamp = expiryTimestamp;
}
- public String tokenOwner() {
- return tokenOwner;
+ /**
+ * Returns a future which yields expiry timestamp
+ */
+ public KafkaFuture<Long> expiryTimestamp() {
+ return expiryTimestamp;
}
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5118953..3ac0e28 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -69,6 +69,8 @@ import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
import org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
@@ -85,12 +87,20 @@ import org.apache.kafka.common.requests.DescribeAclsRequest;
import org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.Resource;
import org.apache.kafka.common.requests.ResourceType;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
@@ -2072,4 +2082,131 @@ public class KafkaAdminClient extends AdminClient {
return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures));
}
+
+ @Override
+ public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) {
+ final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs());
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ CreateDelegationTokenResponse response = (CreateDelegationTokenResponse) abstractResponse;
+ if (response.hasError()) {
+ delegationTokenFuture.completeExceptionally(response.error().exception());
+ } else {
+ TokenInformation tokenInfo = new TokenInformation(response.tokenId(), response.owner(),
+ options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp());
+ DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes());
+ delegationTokenFuture.complete(token);
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ delegationTokenFuture.completeExceptionally(throwable);
+ }
+ }, now);
+
+ return new CreateDelegationTokenResult(delegationTokenFuture);
+ }
+
+ @Override
+ public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) {
+ final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new RenewDelegationTokenRequest.Builder(hmac, options.renewTimePeriodMs());
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ RenewDelegationTokenResponse response = (RenewDelegationTokenResponse) abstractResponse;
+ if (response.hasError()) {
+ expiryTimeFuture.completeExceptionally(response.error().exception());
+ } else {
+ expiryTimeFuture.complete(response.expiryTimestamp());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ expiryTimeFuture.completeExceptionally(throwable);
+ }
+ }, now);
+
+ return new RenewDelegationTokenResult(expiryTimeFuture);
+ }
+
+ @Override
+ public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) {
+ final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs());
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ ExpireDelegationTokenResponse response = (ExpireDelegationTokenResponse) abstractResponse;
+ if (response.hasError()) {
+ expiryTimeFuture.completeExceptionally(response.error().exception());
+ } else {
+ expiryTimeFuture.complete(response.expiryTimestamp());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ expiryTimeFuture.completeExceptionally(throwable);
+ }
+ }, now);
+
+ return new ExpireDelegationTokenResult(expiryTimeFuture);
+ }
+
+ @Override
+ public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) {
+ final KafkaFutureImpl<List<DelegationToken>> tokensFuture = new KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new DescribeDelegationTokenRequest.Builder(options.owners());
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeDelegationTokenResponse response = (DescribeDelegationTokenResponse) abstractResponse;
+ if (response.hasError()) {
+ tokensFuture.completeExceptionally(response.error().exception());
+ } else {
+ tokensFuture.complete(response.tokens());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ tokensFuture.completeExceptionally(throwable);
+ }
+ }, now);
+
+ return new DescribeDelegationTokenResult(tokensFuture);
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
similarity index 54%
copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
index 7490a3e..238dc4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
@@ -14,18 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.token.delegation;
-import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+package org.apache.kafka.clients.admin;
-public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
- private String tokenOwner;
+import org.apache.kafka.common.annotation.InterfaceStability;
- public void tokenOwner(String tokenOwner) {
- this.tokenOwner = tokenOwner;
+/**
+ * Options for {@link AdminClient#renewDelegationToken(byte[], RenewDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class RenewDelegationTokenOptions extends AbstractOptions<RenewDelegationTokenOptions> {
+ private long renewTimePeriodMs = -1;
+
+ public RenewDelegationTokenOptions renewTimePeriodMs(long renewTimePeriodMs) {
+ this.renewTimePeriodMs = renewTimePeriodMs;
+ return this;
}
- public String tokenOwner() {
- return tokenOwner;
+ public long renewTimePeriodMs() {
+ return renewTimePeriodMs;
}
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
similarity index 52%
copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
index 7490a3e..38cdf1a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
@@ -14,18 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.token.delegation;
-import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+package org.apache.kafka.clients.admin;
-public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
- private String tokenOwner;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
- public void tokenOwner(String tokenOwner) {
- this.tokenOwner = tokenOwner;
+/**
+ * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class RenewDelegationTokenResult {
+ private final KafkaFuture<Long> expiryTimestamp;
+
+ RenewDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
+ this.expiryTimestamp = expiryTimestamp;
}
- public String tokenOwner() {
- return tokenOwner;
+ /**
+ * Returns a future which yields expiry timestamp
+ */
+ public KafkaFuture<Long> expiryTimestamp() {
+ return expiryTimestamp;
}
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 80ccb7e..078d844 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
import org.apache.kafka.common.utils.Utils;
import java.util.Collections;
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 5502164..3985c7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -41,7 +41,7 @@ import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internal.ScramMechanism;
import org.apache.kafka.common.security.scram.internal.ScramServerCallbackHandler;
import org.apache.kafka.common.security.ssl.SslFactory;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
import org.apache.kafka.common.utils.Java;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index dba29ea..7ba270a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -184,4 +184,8 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
public List<DelegationToken> tokens() {
return tokens;
}
+
+ public boolean hasError() {
+ return this.error != Errors.NONE;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 0d43440..40f0aad 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -88,9 +88,9 @@ public class ExpireDelegationTokenRequest extends AbstractRequest {
private final ByteBuffer hmac;
private final long expiryTimePeriod;
- public Builder(ByteBuffer hmac, long expiryTimePeriod) {
+ public Builder(byte[] hmac, long expiryTimePeriod) {
super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
- this.hmac = hmac;
+ this.hmac = ByteBuffer.wrap(hmac);
this.expiryTimePeriod = expiryTimePeriod;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
index f7e0ec4..1a673bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -93,4 +93,8 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
public int throttleTimeMs() {
return throttleTimeMs;
}
+
+ public boolean hasError() {
+ return this.error != Errors.NONE;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index 4a4b762..a65c705 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -88,9 +88,9 @@ public class RenewDelegationTokenRequest extends AbstractRequest {
private final ByteBuffer hmac;
private final long renewTimePeriod;
- public Builder(ByteBuffer hmac, long renewTimePeriod) {
+ public Builder(byte[] hmac, long renewTimePeriod) {
super(ApiKeys.RENEW_DELEGATION_TOKEN);
- this.hmac = hmac;
+ this.hmac = ByteBuffer.wrap(hmac);
this.renewTimePeriod = renewTimePeriod;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
index 1885b48..3233f5c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -93,4 +93,8 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
public long expiryTimestamp() {
return expiryTimestamp;
}
+
+ public boolean hasError() {
+ return this.error != Errors.NONE;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
index deee0b8..6085727 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFinal
import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFirstMessage;
import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFinalMessage;
import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFirstMessage;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
index 377aa3d..9a3f0dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
@@ -29,8 +29,8 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramCredentialCallback;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
public class ScramServerCallbackHandler implements AuthenticateCallbackHandler {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
index 05ccbda..e1f97c1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
@@ -16,11 +16,16 @@
*/
package org.apache.kafka.common.security.token.delegation;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.utils.Base64;
-import java.nio.ByteBuffer;
import java.util.Arrays;
+/**
+ * A class representing a delegation token.
+ *
+ */
+@InterfaceStability.Evolving
public class DelegationToken {
private TokenInformation tokenInformation;
private byte[] hmac;
@@ -42,10 +47,6 @@ public class DelegationToken {
return Base64.encoder().encodeToString(hmac);
}
- public ByteBuffer hmacBuffer() {
- return ByteBuffer.wrap(hmac);
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
index 1d500d2..ffd2af3 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
@@ -16,11 +16,17 @@
*/
package org.apache.kafka.common.security.token.delegation;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.util.ArrayList;
import java.util.Collection;
+/**
+ * A class representing a delegation token details.
+ *
+ */
+@InterfaceStability.Evolving
public class TokenInformation {
private KafkaPrincipal owner;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
similarity index 95%
rename from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
index adea210..c05b735 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package org.apache.kafka.common.security.token.delegation;
+package org.apache.kafka.common.security.token.delegation.internal;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internal.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internal.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
import java.util.Collection;
import java.util.HashMap;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
similarity index 94%
rename from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
index 7490a3e..294d7b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.security.token.delegation;
+package org.apache.kafka.common.security.token.delegation.internal;
import org.apache.kafka.common.security.scram.ScramCredentialCallback;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c141a8a..0f5df38 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -277,6 +277,26 @@ public class MockAdminClient extends AdminClient {
}
@Override
+ public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index fab8e93..68979a1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -44,7 +44,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
/**
* Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index bdbd106..c63cecd 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1223,7 +1223,7 @@ public class RequestResponseTest {
}
private RenewDelegationTokenRequest createRenewTokenRequest() {
- return new RenewDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+ return new RenewDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
}
private RenewDelegationTokenResponse createRenewTokenResponse() {
@@ -1231,7 +1231,7 @@ public class RequestResponseTest {
}
private ExpireDelegationTokenRequest createExpireTokenRequest() {
- return new ExpireDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+ return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
}
private ExpireDelegationTokenResponse createExpireTokenResponse() {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
index 3c4b82d..f6e43f9 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
import org.junit.Before;
import org.junit.Test;
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index c010ba0..bcc11fd 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -35,8 +35,6 @@ import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
@@ -342,33 +340,6 @@ class AdminClient(val time: Time,
ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
}
- def createToken(renewers: List[KafkaPrincipal], maxTimePeriodMs: Long = -1): (Errors, DelegationToken) = {
- val responseBody = sendAnyNode(ApiKeys.CREATE_DELEGATION_TOKEN, new CreateDelegationTokenRequest.Builder(renewers.asJava, maxTimePeriodMs))
- val response = responseBody.asInstanceOf[CreateDelegationTokenResponse]
- val tokenInfo = new TokenInformation(response.tokenId, response.owner, renewers.asJava,
- response.issueTimestamp, response.maxTimestamp, response.expiryTimestamp)
- (response.error, new DelegationToken(tokenInfo, response.hmacBytes))
- }
-
- def renewToken(hmac: ByteBuffer, renewTimePeriod: Long = -1): (Errors, Long) = {
- val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new RenewDelegationTokenRequest.Builder(hmac, renewTimePeriod))
- val response = responseBody.asInstanceOf[RenewDelegationTokenResponse]
- (response.error, response.expiryTimestamp)
- }
-
- def expireToken(hmac: ByteBuffer, expiryTimeStamp: Long = -1): (Errors, Long) = {
- val responseBody = sendAnyNode(ApiKeys.EXPIRE_DELEGATION_TOKEN, new ExpireDelegationTokenRequest.Builder(hmac, expiryTimeStamp))
- val response = responseBody.asInstanceOf[ExpireDelegationTokenResponse]
- (response.error, response.expiryTimestamp)
- }
-
- def describeToken(owners: List[KafkaPrincipal]): (Errors, List[DelegationToken]) = {
- val ownersList = if (owners == null) null else owners.asJava
- val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new DescribeDelegationTokenRequest.Builder(ownersList))
- val response = responseBody.asInstanceOf[DescribeDelegationTokenResponse]
- (response.error, response.tokens().asScala.toList)
- }
-
def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
def coordinatorLookup(group: String): Either[Node, Errors] = {
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 6c5d1ce..0e6ea86 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -17,12 +17,13 @@
package kafka.admin
-import java.nio.ByteBuffer
+import java.text.SimpleDateFormat
+import java.util
-import joptsimple._
+import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser}
import kafka.utils.{CommandLineUtils, Exit, Logging}
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils}
@@ -71,19 +72,20 @@ object DelegationTokenCommand extends Logging {
}
}
- def createToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
- val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt)
+ def createToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): DelegationToken = {
+ val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]())
val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs)
- val response = adminClient.createToken(renewerPrincipals, maxLifeTimeMs)
- response match {
- case (Errors.NONE, token) => println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
- case (e, _) => throw new AdminOperationException(e.message)
- }
+ val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals)
+ val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
+ val token = createResult.delegationToken().get()
+ println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
+ token
}
def printToken(tokens: List[DelegationToken]): Unit = {
+ val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
for (token <- tokens) {
val tokenInfo = token.tokenInfo
@@ -92,56 +94,59 @@ object DelegationTokenCommand extends Logging {
token.hmacAsBase64String,
tokenInfo.owner,
tokenInfo.renewersAsString,
- tokenInfo.issueTimestamp,
- tokenInfo.expiryTimestamp,
- tokenInfo.maxTimestamp))
+ dateFormat.format(tokenInfo.issueTimestamp),
+ dateFormat.format(tokenInfo.expiryTimestamp),
+ dateFormat.format(tokenInfo.maxTimestamp)))
println()
}
}
- private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): List[KafkaPrincipal] = {
+ private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Option[util.List[KafkaPrincipal]] = {
if (opts.options.has(principalOptionSpec))
- opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList
+ Some(opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList.asJava)
else
- List.empty[KafkaPrincipal]
+ None
}
- def renewToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+ def renewToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
val hmac = opts.options.valueOf(opts.hmacOpt)
val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
- val response = adminClient.renewToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), renewTimePeriodMs)
- response match {
- case (Errors.NONE, expiryTimeStamp) => println("Completed renew operation. New expiry timestamp : %s".format(expiryTimeStamp))
- case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
- }
+ val renewResult = adminClient.renewDelegationToken(Base64.decoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs))
+ val expiryTimeStamp = renewResult.expiryTimestamp().get()
+ val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+ println("Completed renew operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
+ expiryTimeStamp
}
- def expireToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+ def expireToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
val hmac = opts.options.valueOf(opts.hmacOpt)
val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
- val response = adminClient.expireToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), expiryTimePeriodMs)
- response match {
- case (Errors.NONE, expiryTimeStamp) => println("Completed expire operation. New expiry timestamp : %s".format(expiryTimeStamp))
- case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
- }
+ val expireResult = adminClient.expireDelegationToken(Base64.decoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
+ val expiryTimeStamp = expireResult.expiryTimestamp().get()
+ val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+ println("Completed expire operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
+ expiryTimeStamp
}
- def describeToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+ def describeToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): List[DelegationToken] = {
val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
- println("Calling describe token operation for owners :" + ownerPrincipals)
- val response = adminClient.describeToken(ownerPrincipals)
- response match {
- case (Errors.NONE, tokens) => println("Total Number of tokens : %s".format(tokens.size)); printToken(tokens)
- case (e, tokens) => throw new AdminOperationException(e.message)
- }
+ if (ownerPrincipals.isEmpty)
+ println("Calling describe token operation for current user.")
+ else
+ println("Calling describe token operation for owners :" + ownerPrincipals.get)
+
+ val describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals.orNull))
+ val tokens = describeResult.delegationTokens().get().asScala.toList
+ println("Total number of tokens : %s".format(tokens.size)); printToken(tokens)
+ tokens
}
- private def createAdminClient(opts: DelegationTokenCommandOptions): AdminClient = {
+ private def createAdminClient(opts: DelegationTokenCommandOptions): JAdminClient = {
val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
- AdminClient.create(props)
+ JAdminClient.create(props)
}
class DelegationTokenCommandOptions(args: Array[String]) {
@@ -157,10 +162,11 @@ object DelegationTokenCommand extends Logging {
.withRequiredArg
.ofType(classOf[String])
- val createOpt = parser.accepts("create", "Create a new delegation token.")
- val renewOpt = parser.accepts("renew", "Renew delegation token.")
- val expiryOpt = parser.accepts("expire", "Expire delegation token.")
- val describeOpt = parser.accepts("describe", "describe delegation tokens.")
+ val createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.")
+ val renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.")
+ val expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.")
+ val describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
+ " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.")
val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
.withOptionalArg()
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 6f9c252..f208087 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef._
import org.apache.kafka.common.security.scram.internal.{ScramCredentialUtils, ScramMechanism}
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) {
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 4a947a1..62a5e20 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -31,7 +31,8 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internal.{ScramFormatter, ScramMechanism}
import org.apache.kafka.common.security.scram.ScramCredential
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time}
import scala.collection.JavaConverters._
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7105688..a0d2c79 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internal.ScramMechanism
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Node}
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index 27a6d11..56a3b8a 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -18,10 +18,9 @@ package kafka.api
import java.util
-import kafka.admin.AdminClient
import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internal.ScramMechanism
@@ -83,9 +82,8 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
config.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
- val adminClient = AdminClient.create(config.asScala.toMap)
- val (error, token) = adminClient.createToken(List())
-
+ val adminClient = AdminClient.create(config)
+ val token = adminClient.createDelegationToken().delegationToken().get()
//wait for token to reach all the brokers
TestUtils.waitUntilTrue(() => servers.forall(server => !server.tokenCache.tokens().isEmpty),
"Timed out waiting for token to propagate to all servers")
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
new file mode 100644
index 0000000..6ae8f5e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.util
+
+import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.ExecutionException
+
+class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
+ override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ private val kafkaClientSaslMechanism = "PLAIN"
+ private val kafkaServerSaslMechanisms = List("PLAIN")
+ protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+ protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+ var adminClient: org.apache.kafka.clients.admin.AdminClient = null
+
+ override def numBrokers = 1
+
+ @Before
+ override def setUp(): Unit = {
+ startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+ super.setUp()
+ }
+
+ override def generateConfigs = {
+ val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+ enableControlledShutdown = false, enableDeleteTopic = true,
+ interBrokerSecurityProtocol = Some(securityProtocol),
+ trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
+ props.foreach(propertyOverrides)
+ props.map(KafkaConfig.fromProps)
+ }
+
+ private def createAdminConfig():util.Map[String, Object] = {
+ val config = new util.HashMap[String, Object]
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ config
+ }
+
+ @Test
+ def testDelegationTokenRequests(): Unit = {
+ adminClient = org.apache.kafka.clients.admin.AdminClient.create(createAdminConfig)
+ val renewer1 = "User:renewer1"
+ val renewer2 = "User:renewer2"
+
+ // create token1 with renewer1
+ val tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer1)))
+
+ var tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+ assertTrue(tokens.size == 1)
+ val token1 = tokens.head
+ assertEquals(token1, tokenCreated)
+
+ // create token2 with renewer2
+ val token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer2)))
+
+ tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+ assertTrue(tokens.size == 2)
+ assertEquals(Set(token1, token2), tokens.toSet)
+
+ //get tokens for renewer2
+ tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer2)))
+ assertTrue(tokens.size == 1)
+ assertEquals(Set(token2), tokens.toSet)
+
+ //test renewing tokens
+ val expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String()))
+ val renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer1))).head
+ assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp())
+
+ //test expire tokens
+ DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String()))
+ DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String()))
+
+ tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+ assertTrue(tokens.size == 0)
+
+ //create token with invalid renewer principal type
+ intercept[ExecutionException](DelegationTokenCommand.createToken(adminClient, getCreateOpts(List("Group:Renewer3"))))
+
+ // try describing tokens for unknown owner
+ assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List("User:Unknown"))).isEmpty)
+ }
+
+ private def getCreateOpts(renewers: List[String]): DelegationTokenCommandOptions = {
+ val opts = ListBuffer("--bootstrap-server", brokerList, "--max-life-time-period", "-1",
+ "--command-config", "testfile", "--create")
+ renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", renewer))
+ new DelegationTokenCommandOptions(opts.toArray)
+ }
+
+ private def getDescribeOpts(owners: List[String]): DelegationTokenCommandOptions = {
+ val opts = ListBuffer("--bootstrap-server", brokerList, "--command-config", "testfile", "--describe")
+ owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner))
+ new DelegationTokenCommandOptions(opts.toArray)
+ }
+
+ private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = {
+ val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--renew",
+ "--renew-time-period", "-1",
+ "--hmac", hmac)
+ new DelegationTokenCommandOptions(opts)
+ }
+
+ private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = {
+ val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--expire",
+ "--expiry-time-period", "-1",
+ "--hmac", hmac)
+ new DelegationTokenCommandOptions(opts)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ if (adminClient != null)
+ adminClient.close()
+ super.tearDown()
+ closeSasl()
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index b8388b4..6093622 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internal.ScramMechanism
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
import org.junit.Assert._
import org.junit.{After, Before, Test}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 4c42dd2..3d4be53 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -19,14 +19,13 @@ package kafka.server
import java.nio.ByteBuffer
import java.util
-import kafka.admin.AdminClient
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
-import org.junit.Assert._
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
var adminClient: AdminClient = null
@@ -49,21 +48,19 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
@Test
def testDelegationTokenRequests(): Unit = {
- adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+ adminClient = AdminClient.create(createAdminConfig)
- val createResponse = adminClient.createToken(List())
- assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, createResponse._1)
+ val createResult = adminClient.createDelegationToken()
+ intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
- val describeResponse = adminClient.describeToken(List())
- assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, describeResponse._1)
+ val describeResult = adminClient.describeDelegationToken()
+ intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
- //test renewing tokens
- val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
- assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, renewResponse._1)
+ val renewResult = adminClient.renewDelegationToken("".getBytes())
+ intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
- //test expire tokens tokens
- val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
- assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, expireResponse._1)
+ val expireResult = adminClient.expireDelegationToken("".getBytes())
+ intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index 55bf5fd..a002750 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -18,17 +18,17 @@ package kafka.server
import java.util
-import kafka.admin.AdminClient
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
+import org.apache.kafka.common.errors.InvalidPrincipalTypeException
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.SecurityUtils
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -46,15 +46,6 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
super.setUp()
}
- def createAdminConfig():util.Map[String, Object] = {
- val config = new util.HashMap[String, Object]
- config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
- val securityProps: util.Map[Object, Object] =
- TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
- securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
- config
- }
-
override def generateConfigs = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
enableControlledShutdown = false, enableDeleteTopic = true,
@@ -64,46 +55,73 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
props.map(KafkaConfig.fromProps)
}
+ private def createAdminConfig():util.Map[String, Object] = {
+ val config = new util.HashMap[String, Object]
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ config
+ }
+
@Test
def testDelegationTokenRequests(): Unit = {
- adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
-
- // test creating token
- val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
- val tokenResult1 = adminClient.createToken(renewer1)
- assertEquals(Errors.NONE, tokenResult1._1)
- var token1 = adminClient.describeToken(null)._2.head
- assertEquals(token1, tokenResult1._2)
+ adminClient = AdminClient.create(createAdminConfig)
+
+ // create token1 with renewer1
+ val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:renewer1")).asJava
+ val createResult1 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer1))
+ val tokenCreated = createResult1.delegationToken().get()
+
+ //test describe token
+ var tokens = adminClient.describeDelegationToken().delegationTokens().get()
+ assertTrue(tokens.size() == 1)
+ var token1 = tokens.get(0)
+ assertEquals(token1, tokenCreated)
+
+ // create token2 with renewer2
+ val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:renewer2")).asJava
+ val createResult2 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer2))
+ val token2 = createResult2.delegationToken().get()
+
+ //get all tokens
+ tokens = adminClient.describeDelegationToken().delegationTokens().get()
+ assertTrue(tokens.size() == 2)
+ assertEquals(Set(token1, token2), tokens.asScala.toSet)
+
+ //get tokens for renewer2
+ tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(renewer2)).delegationTokens().get()
+ assertTrue(tokens.size() == 1)
+ assertEquals(Set(token2), tokens.asScala.toSet)
//test renewing tokens
- val renewResponse = adminClient.renewToken(token1.hmacBuffer())
- assertEquals(Errors.NONE, renewResponse._1)
-
- token1 = adminClient.describeToken(null)._2.head
- assertEquals(renewResponse._2, token1.tokenInfo().expiryTimestamp())
+ val renewResult = adminClient.renewDelegationToken(token1.hmac())
+ var expiryTimestamp = renewResult.expiryTimestamp().get()
- //test describe tokens
- val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:Renewer1"))
- val tokenResult2 = adminClient.createToken(renewer2)
- assertEquals(Errors.NONE, tokenResult2._1)
- val token2 = tokenResult2._2
+ val describeResult = adminClient.describeDelegationToken()
+ val tokenId = token1.tokenInfo().tokenId()
+ token1 = describeResult.delegationTokens().get().asScala.filter(dt => dt.tokenInfo().tokenId() == tokenId).head
+ assertEquals(expiryTimestamp, token1.tokenInfo().expiryTimestamp())
- assertTrue(adminClient.describeToken(null)._2.size == 2)
+ //test expire tokens
+ val expireResult1 = adminClient.expireDelegationToken(token1.hmac())
+ expiryTimestamp = expireResult1.expiryTimestamp().get()
- //test expire tokens tokens
- val expireResponse1 = adminClient.expireToken(token1.hmacBuffer())
- assertEquals(Errors.NONE, expireResponse1._1)
+ val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
+ expiryTimestamp = expireResult2.expiryTimestamp().get()
- val expireResponse2 = adminClient.expireToken(token2.hmacBuffer())
- assertEquals(Errors.NONE, expireResponse2._1)
-
- assertTrue(adminClient.describeToken(null)._2.size == 0)
+ tokens = adminClient.describeDelegationToken().delegationTokens().get()
+ assertTrue(tokens.size == 0)
//create token with invalid principal type
- val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer1"))
- val tokenResult3 = adminClient.createToken(renewer3)
- assertEquals(Errors.INVALID_PRINCIPAL_TYPE, tokenResult3._1)
-
+ val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
+ val createResult3 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer3))
+ intercept[ExecutionException](createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
+
+ // try describing tokens for unknown owner
+ val unknownOwner = List(SecurityUtils.parseKafkaPrincipal("User:Unknown")).asJava
+ tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(unknownOwner)).delegationTokens().get()
+ assertTrue(tokens.isEmpty)
}
@After
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index 0561cac..8f8842b 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -19,17 +19,15 @@ package kafka.server
import java.nio.ByteBuffer
import java.util
-import kafka.admin.AdminClient
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.SecurityUtils
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.errors.DelegationTokenDisabledException
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -58,23 +56,19 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
@Test
def testDelegationTokenRequests(): Unit = {
- adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
-
- val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
- val createResponse = adminClient.createToken(renewer1)
- assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createResponse._1)
+ adminClient = AdminClient.create(createAdminConfig)
- val describeResponse = adminClient.describeToken(List())
- assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, describeResponse._1)
+ val createResult = adminClient.createDelegationToken()
+ intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
- //test renewing tokens
- val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
- assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, renewResponse._1)
+ val describeResult = adminClient.describeDelegationToken()
+ intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
- //test expire tokens tokens
- val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
- assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, expireResponse._1)
+ val renewResult = adminClient.renewDelegationToken("".getBytes())
+ intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
+ val expireResult = adminClient.expireDelegationToken("".getBytes())
+ intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
}
@After
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 2a7d6d4..ed85415 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -315,13 +315,13 @@ class RequestQuotaTest extends BaseRequestTest {
new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000)
case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
- new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+ new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
case ApiKeys.RENEW_DELEGATION_TOKEN =>
- new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+ new RenewDelegationTokenRequest.Builder("".getBytes, 1000)
case ApiKeys.DELETE_GROUPS =>
new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.