You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/07 12:04:36 UTC
[kafka] branch 1.1 updated: KAFKA-6532: Reduce impact of delegation
tokens on public interfaces (#4524)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 6087180 KAFKA-6532: Reduce impact of delegation tokens on public interfaces (#4524)
6087180 is described below
commit 608718098f1c14e3d2ddaf28e92c06d781ca1788
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Feb 7 03:29:08 2018 -0800
KAFKA-6532: Reduce impact of delegation tokens on public interfaces (#4524)
Keep delegation token implementation internal without exposing implementation details to pluggable classes:
1. KafkaPrincipal#tokenAuthenticated must always be set by SaslServerAuthenticator so that custom PrincipalBuilders cannot override.
2. Replace o.a.k.c.security.scram.DelegationTokenAuthenticationCallback with a more generic ScramExtensionsCallback that can be used to add more extensions in future.
3. Separate out ScramCredentialCallback (KIP-86 makes this a public interface) from delegation token credential callback (which is internal).
Reviewers: Jun Rao <ju...@gmail.com>, Manikumar Reddy <ma...@gmail.com>
---
.../kafka/common/security/auth/KafkaPrincipal.java | 12 ++--
.../DefaultKafkaPrincipalBuilder.java | 10 +--
.../authenticator/SaslClientCallbackHandler.java | 10 +--
.../authenticator/SaslServerAuthenticator.java | 7 +-
.../security/scram/ScramCredentialCallback.java | 24 -------
.../common/security/scram/ScramExtensions.java | 78 ++++++++++++++++++++++
...nCallback.java => ScramExtensionsCallback.java} | 14 ++--
.../common/security/scram/ScramLoginModule.java | 6 +-
.../kafka/common/security/scram/ScramMessages.java | 31 +++------
.../common/security/scram/ScramSaslClient.java | 8 +--
.../common/security/scram/ScramSaslServer.java | 43 +++++++-----
.../security/scram/ScramServerCallbackHandler.java | 15 +++--
.../DelegationTokenCredentialCallback.java} | 17 +++--
.../auth/DefaultKafkaPrincipalBuilderTest.java | 2 -
.../common/security/scram/ScramMessagesTest.java | 6 +-
15 files changed, 170 insertions(+), 113 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index 10bf76d..74bc9c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -47,16 +47,11 @@ public class KafkaPrincipal implements Principal {
private final String principalType;
private final String name;
- private boolean tokenAuthenticated;
+ private volatile boolean tokenAuthenticated;
public KafkaPrincipal(String principalType, String name) {
- this(principalType, name, false);
- }
-
- public KafkaPrincipal(String principalType, String name, boolean tokenauth) {
this.principalType = requireNonNull(principalType, "Principal type cannot be null");
this.name = requireNonNull(name, "Principal name cannot be null");
- this.tokenAuthenticated = tokenauth;
}
/**
@@ -91,7 +86,6 @@ public class KafkaPrincipal implements Principal {
public int hashCode() {
int result = principalType != null ? principalType.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
- result = 31 * result + (tokenAuthenticated ? 1 : 0);
return result;
}
@@ -104,6 +98,10 @@ public class KafkaPrincipal implements Principal {
return principalType;
}
+ public void tokenAuthenticated(boolean tokenAuthenticated) {
+ this.tokenAuthenticated = tokenAuthenticated;
+ }
+
public boolean tokenAuthenticated() {
return tokenAuthenticated;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
index 7404238..30b0a3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.scram.ScramLoginModule;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
@@ -119,13 +118,8 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
SaslServer saslServer = ((SaslAuthenticationContext) context).server();
if (SaslConfigs.GSSAPI_MECHANISM.equals(saslServer.getMechanismName()))
return applyKerberosShortNamer(saslServer.getAuthorizationID());
- else {
- Boolean isTokenAuthenticated = (Boolean) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG);
- if (isTokenAuthenticated != null && isTokenAuthenticated)
- return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(), true);
- else
- return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
- }
+ else
+ return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
} else {
throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index de96cef..31c51c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -28,7 +28,7 @@ import javax.security.sasl.RealmCallback;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.scram.DelegationTokenAuthenticationCallback;
+import org.apache.kafka.common.security.scram.ScramExtensionsCallback;
/**
* Callback handler for Sasl clients. The callbacks required for the SASL mechanism
@@ -81,10 +81,10 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
ac.setAuthorized(authId.equals(authzId));
if (ac.isAuthorized())
ac.setAuthorizedID(authzId);
- } else if (callback instanceof DelegationTokenAuthenticationCallback) {
- DelegationTokenAuthenticationCallback tc = (DelegationTokenAuthenticationCallback) callback;
- if (!isKerberos && subject != null && !subject.getPublicCredentials(Boolean.class).isEmpty()) {
- tc.tokenauth(subject.getPublicCredentials(Boolean.class).iterator().next());
+ } else if (callback instanceof ScramExtensionsCallback) {
+ ScramExtensionsCallback sc = (ScramExtensionsCallback) callback;
+ if (!isKerberos && subject != null && !subject.getPublicCredentials(Map.class).isEmpty()) {
+ sc.extensions((Map<String, String>) subject.getPublicCredentials(Map.class).iterator().next());
}
} else {
throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index ca6e9d2..2a80e5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.apache.kafka.common.security.scram.ScramServerCallbackHandler;
import org.apache.kafka.common.utils.Utils;
@@ -297,7 +298,11 @@ public class SaslServerAuthenticator implements Authenticator {
@Override
public KafkaPrincipal principal() {
SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress());
- return principalBuilder.build(context);
+ KafkaPrincipal principal = principalBuilder.build(context);
+ if (ScramMechanism.isScram(saslMechanism) && Boolean.parseBoolean((String) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) {
+ principal.tokenAuthenticated(true);
+ }
+ return principal;
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
index 7f3601c..931210a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
@@ -20,36 +20,12 @@ import javax.security.auth.callback.Callback;
public class ScramCredentialCallback implements Callback {
private ScramCredential scramCredential;
- private boolean tokenAuthenticated;
- private String tokenOwner;
- private String mechanism;
-
- public ScramCredentialCallback(boolean tokenAuthenticated, String mechanism) {
- this.tokenAuthenticated = tokenAuthenticated;
- this.mechanism = mechanism;
- }
public ScramCredential scramCredential() {
return scramCredential;
}
- public boolean tokenauth() {
- return tokenAuthenticated;
- }
-
public void scramCredential(ScramCredential scramCredential) {
this.scramCredential = scramCredential;
}
-
- public void tokenOwner(String tokenOwner) {
- this.tokenOwner = tokenOwner;
- }
-
- public String tokenOwner() {
- return tokenOwner;
- }
-
- public String mechanism() {
- return mechanism;
- }
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java
new file mode 100644
index 0000000..0f461c0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.scram;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ScramExtensions {
+ private final Map<String, String> extensionMap;
+
+ public ScramExtensions() {
+ this(Collections.<String, String>emptyMap());
+ }
+
+ public ScramExtensions(String extensions) {
+ this(stringToMap(extensions));
+ }
+
+ public ScramExtensions(Map<String, String> extensionMap) {
+ this.extensionMap = extensionMap;
+ }
+
+ public String extensionValue(String name) {
+ return extensionMap.get(name);
+ }
+
+ public Set<String> extensionNames() {
+ return extensionMap.keySet();
+ }
+
+ public boolean tokenAuthenticated() {
+ return Boolean.parseBoolean(extensionMap.get(ScramLoginModule.TOKEN_AUTH_CONFIG));
+ }
+
+ @Override
+ public String toString() {
+ return mapToString(extensionMap);
+ }
+
+ private static Map<String, String> stringToMap(String extensions) {
+ Map<String, String> extensionMap = new HashMap<>();
+
+ if (!extensions.isEmpty()) {
+ String[] attrvals = extensions.split(",");
+ for (String attrval : attrvals) {
+ String[] array = attrval.split("=", 2);
+ extensionMap.put(array[0], array[1]);
+ }
+ }
+ return extensionMap;
+ }
+
+ private static String mapToString(Map<String, String> extensionMap) {
+ StringBuilder builder = new StringBuilder();
+ for (Map.Entry<String, String> entry : extensionMap.entrySet()) {
+ builder.append(entry.getKey());
+ builder.append('=');
+ builder.append(entry.getValue());
+ }
+ return builder.toString();
+ }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
similarity index 72%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
copy to clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
index df6e849..b40468b 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
@@ -18,15 +18,17 @@
package org.apache.kafka.common.security.scram;
import javax.security.auth.callback.Callback;
+import java.util.Collections;
+import java.util.Map;
-public class DelegationTokenAuthenticationCallback implements Callback {
- private boolean tokenauth;
+public class ScramExtensionsCallback implements Callback {
+ private Map<String, String> extensions = Collections.emptyMap();
- public String extension() {
- return ScramLoginModule.TOKEN_AUTH_CONFIG + "=" + Boolean.toString(tokenauth);
+ public Map<String, String> extensions() {
+ return extensions;
}
- public void tokenauth(Boolean tokenauth) {
- this.tokenauth = tokenauth;
+ public void extensions(Map<String, String> extensions) {
+ this.extensions = extensions;
}
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
index 8000f4c..43df515 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.security.scram;
+import java.util.Collections;
import java.util.Map;
import javax.security.auth.Subject;
@@ -44,7 +45,10 @@ public class ScramLoginModule implements LoginModule {
subject.getPrivateCredentials().add(password);
Boolean useTokenAuthentication = "true".equalsIgnoreCase((String) options.get(TOKEN_AUTH_CONFIG));
- subject.getPublicCredentials().add(useTokenAuthentication);
+ if (useTokenAuthentication) {
+ Map<String, String> scramExtensions = Collections.singletonMap(TOKEN_AUTH_CONFIG, "true");
+ subject.getPublicCredentials().add(scramExtensions);
+ }
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
index e697ea5..05b3d77 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.security.scram;
import org.apache.kafka.common.utils.Base64;
import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -77,7 +76,7 @@ public class ScramMessages {
private final String saslName;
private final String nonce;
private final String authorizationId;
- private final String extensions;
+ private final ScramExtensions extensions;
public ClientFirstMessage(byte[] messageBytes) throws SaslException {
String message = toMessage(messageBytes);
Matcher matcher = PATTERN.matcher(message);
@@ -88,12 +87,13 @@ public class ScramMessages {
this.saslName = matcher.group("saslname");
this.nonce = matcher.group("nonce");
String extString = matcher.group("extensions");
- this.extensions = extString.startsWith(",") ? extString.substring(1) : extString;
+
+ this.extensions = extString.startsWith(",") ? new ScramExtensions(extString.substring(1)) : new ScramExtensions();
}
- public ClientFirstMessage(String saslName, String nonce, String extensions) {
+ public ClientFirstMessage(String saslName, String nonce, Map<String, String> extensions) {
this.saslName = saslName;
this.nonce = nonce;
- this.extensions = extensions;
+ this.extensions = new ScramExtensions(extensions);
this.authorizationId = ""; // Optional authzid not specified in gs2-header
}
public String saslName() {
@@ -108,29 +108,16 @@ public class ScramMessages {
public String gs2Header() {
return "n," + authorizationId + ",";
}
- public String extensions() {
+ public ScramExtensions extensions() {
return extensions;
}
- public Map<String, String> extensionsAsMap() {
- Map<String, String> map = new HashMap<>();
-
- if (extensions.isEmpty())
- return map;
-
- String[] attrvals = extensions.split(",");
- for (String attrval: attrvals) {
- String[] array = attrval.split("=", 2);
- map.put(array[0], array[1]);
- }
- return map;
- }
-
public String clientFirstMessageBare() {
- if (extensions.isEmpty())
+ String extensionStr = extensions.toString();
+ if (extensionStr.isEmpty())
return String.format("n=%s,r=%s", saslName, nonce);
else
- return String.format("n=%s,r=%s,%s", saslName, nonce, extensions);
+ return String.format("n=%s,r=%s,%s", saslName, nonce, extensionStr);
}
String toMessage() {
return gs2Header() + clientFirstMessageBare();
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
index 6b66f5e..71109df 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
@@ -97,18 +97,18 @@ public class ScramSaslClient implements SaslClient {
throw new SaslException("Expected empty challenge");
clientNonce = formatter.secureRandomString();
NameCallback nameCallback = new NameCallback("Name:");
- DelegationTokenAuthenticationCallback tokenAuthCallback = new DelegationTokenAuthenticationCallback();
+ ScramExtensionsCallback extensionsCallback = new ScramExtensionsCallback();
try {
- callbackHandler.handle(new Callback[]{nameCallback, tokenAuthCallback});
+ callbackHandler.handle(new Callback[]{nameCallback, extensionsCallback});
} catch (IOException | UnsupportedCallbackException e) {
throw new SaslException("User name could not be obtained", e);
}
String username = nameCallback.getName();
String saslName = formatter.saslName(username);
- String extension = tokenAuthCallback.extension();
- this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extension);
+ Map<String, String> extensions = extensionsCallback.extensions();
+ this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extensions);
setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
return clientFirstMessage.toBytes();
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
index 94b92b6..314c1d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
@@ -22,6 +22,7 @@ import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@@ -37,6 +38,8 @@ import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory;
public class ScramSaslServer implements SaslServer {
private static final Logger log = LoggerFactory.getLogger(ScramSaslServer.class);
+ private static final Set<String> SUPPORTED_EXTENSIONS = Utils.mkSet(ScramLoginModule.TOKEN_AUTH_CONFIG);
enum State {
RECEIVE_CLIENT_FIRST_MESSAGE,
@@ -65,9 +69,9 @@ public class ScramSaslServer implements SaslServer {
private String username;
private ClientFirstMessage clientFirstMessage;
private ServerFirstMessage serverFirstMessage;
+ private ScramExtensions scramExtensions;
private ScramCredential scramCredential;
- private boolean tokenAuthentication;
- private String tokenOwner;
+ private String authorizationId;
public ScramSaslServer(ScramMechanism mechanism, Map<String, ?> props, CallbackHandler callbackHandler) throws NoSuchAlgorithmException {
this.mechanism = mechanism;
@@ -91,18 +95,29 @@ public class ScramSaslServer implements SaslServer {
switch (state) {
case RECEIVE_CLIENT_FIRST_MESSAGE:
this.clientFirstMessage = new ClientFirstMessage(response);
+ this.scramExtensions = clientFirstMessage.extensions();
+ if (!SUPPORTED_EXTENSIONS.containsAll(scramExtensions.extensionNames())) {
+ log.debug("Unsupported extensions will be ignored, supported {}, provided {}",
+ SUPPORTED_EXTENSIONS, scramExtensions.extensionNames());
+ }
String serverNonce = formatter.secureRandomString();
try {
String saslName = clientFirstMessage.saslName();
this.username = formatter.username(saslName);
- Map<String, String> extensions = clientFirstMessage.extensionsAsMap();
- this.tokenAuthentication = "true".equalsIgnoreCase(extensions.get(ScramLoginModule.TOKEN_AUTH_CONFIG));
NameCallback nameCallback = new NameCallback("username", username);
- ScramCredentialCallback credentialCallback = new ScramCredentialCallback(tokenAuthentication, getMechanismName());
- callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
- this.tokenOwner = credentialCallback.tokenOwner();
- if (tokenAuthentication && tokenOwner == null)
- throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
+ ScramCredentialCallback credentialCallback;
+ if (scramExtensions.tokenAuthenticated()) {
+ DelegationTokenCredentialCallback tokenCallback = new DelegationTokenCredentialCallback();
+ credentialCallback = tokenCallback;
+ callbackHandler.handle(new Callback[]{nameCallback, tokenCallback});
+ if (tokenCallback.tokenOwner() == null)
+ throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
+ this.authorizationId = tokenCallback.tokenOwner();
+ } else {
+ credentialCallback = new ScramCredentialCallback();
+ callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
+ this.authorizationId = username;
+ }
this.scramCredential = credentialCallback.scramCredential();
if (scramCredential == null)
throw new SaslException("Authentication failed: Invalid user credentials");
@@ -150,11 +165,7 @@ public class ScramSaslServer implements SaslServer {
public String getAuthorizationID() {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
-
- if (tokenAuthentication)
- return tokenOwner; // return token owner as principal for this session
-
- return username;
+ return authorizationId;
}
@Override
@@ -167,8 +178,8 @@ public class ScramSaslServer implements SaslServer {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
- if (ScramLoginModule.TOKEN_AUTH_CONFIG.equals(propName))
- return tokenAuthentication;
+ if (SUPPORTED_EXTENSIONS.contains(propName))
+ return scramExtensions.extensionValue(propName);
else
return null;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
index a064e8a..5e37eae 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
@@ -28,11 +28,13 @@ import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.authenticator.AuthCallbackHandler;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
public class ScramServerCallbackHandler implements AuthCallbackHandler {
private final CredentialCache.Cache<ScramCredential> credentialCache;
private final DelegationTokenCache tokenCache;
+ private String saslMechanism;
public ScramServerCallbackHandler(CredentialCache.Cache<ScramCredential> credentialCache,
DelegationTokenCache tokenCache) {
@@ -46,13 +48,13 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
for (Callback callback : callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
- else if (callback instanceof ScramCredentialCallback) {
+ else if (callback instanceof DelegationTokenCredentialCallback) {
+ DelegationTokenCredentialCallback tokenCallback = (DelegationTokenCredentialCallback) callback;
+ tokenCallback.scramCredential(tokenCache.credential(saslMechanism, username));
+ tokenCallback.tokenOwner(tokenCache.owner(username));
+ } else if (callback instanceof ScramCredentialCallback) {
ScramCredentialCallback sc = (ScramCredentialCallback) callback;
- if (sc.tokenauth()) {
- sc.scramCredential(tokenCache.credential(sc.mechanism(), username));
- sc.tokenOwner(tokenCache.owner(username));
- } else
- sc.scramCredential(credentialCache.get(username));
+ sc.scramCredential(credentialCache.get(username));
} else
throw new UnsupportedCallbackException(callback);
}
@@ -60,6 +62,7 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
@Override
public void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism) {
+ this.saslMechanism = saslMechanism;
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
similarity index 66%
rename from clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
index df6e849..7490a3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
@@ -14,19 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.kafka.common.security.token.delegation;
-package org.apache.kafka.common.security.scram;
+import org.apache.kafka.common.security.scram.ScramCredentialCallback;
-import javax.security.auth.callback.Callback;
+public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
+ private String tokenOwner;
-public class DelegationTokenAuthenticationCallback implements Callback {
- private boolean tokenauth;
-
- public String extension() {
- return ScramLoginModule.TOKEN_AUTH_CONFIG + "=" + Boolean.toString(tokenauth);
+ public void tokenOwner(String tokenOwner) {
+ this.tokenOwner = tokenOwner;
}
- public void tokenauth(Boolean tokenauth) {
- this.tokenauth = tokenauth;
+ public String tokenOwner() {
+ return tokenOwner;
}
}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index 787f5a7..a30c09f 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@@ -119,7 +118,6 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
EasyMock.expect(server.getAuthorizationID()).andReturn("foo");
- EasyMock.expect(server.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG)).andReturn(false);
replayAll();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
index 58be6e1..7b04ede 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
@@ -21,12 +21,14 @@ import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import javax.security.sasl.SaslException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.kafka.common.security.scram.ScramMessages.AbstractScramMessage;
@@ -70,7 +72,7 @@ public class ScramMessagesTest {
@Test
public void validClientFirstMessage() throws SaslException {
String nonce = formatter.secureRandomString();
- ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, "");
+ ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, Collections.<String, String>emptyMap());
checkClientFirstMessage(m, "someuser", nonce, "");
// Default format used by Kafka client: only user and nonce are specified
@@ -111,7 +113,7 @@ public class ScramMessagesTest {
//optional tokenauth specified as extensions
str = String.format("n,,n=testuser,r=%s,%s", nonce, "tokenauth=true");
m = createScramMessage(ClientFirstMessage.class, str);
- assertEquals("tokenauth=true", m.extensions());
+ assertTrue("Token authentication not set from extensions", m.extensions().tokenAuthenticated());
}
@Test
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.