You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/07 11:29:21 UTC

[kafka] branch trunk updated: KAFKA-6532: Reduce impact of delegation tokens on public interfaces (#4524)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 65b5ccf  KAFKA-6532: Reduce impact of delegation tokens on public interfaces (#4524)
65b5ccf is described below

commit 65b5ccf6413369aa4f21c72abcdf31ca72a79b00
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Feb 7 03:29:08 2018 -0800

    KAFKA-6532: Reduce impact of delegation tokens on public interfaces (#4524)
    
    Keep delegation token implementation internal without exposing implementation details to pluggable classes:
      1. KafkaPrincipal#tokenAuthenticated must always be set by SaslServerAuthenticator so that custom PrincipalBuilders cannot override.
      2. Replace o.a.k.c.security.scram.DelegationTokenAuthenticationCallback with a more generic ScramExtensionsCallback that can be used to add more extensions in future.
      3. Separate out ScramCredentialCallback (KIP-86 makes this a public interface) from delegation token credential callback (which is internal).
    
    Reviewers: Jun Rao <ju...@gmail.com>, Manikumar Reddy <ma...@gmail.com>
---
 .../kafka/common/security/auth/KafkaPrincipal.java | 12 ++--
 .../DefaultKafkaPrincipalBuilder.java              | 10 +--
 .../authenticator/SaslClientCallbackHandler.java   | 10 +--
 .../authenticator/SaslServerAuthenticator.java     |  7 +-
 .../security/scram/ScramCredentialCallback.java    | 24 -------
 .../common/security/scram/ScramExtensions.java     | 78 ++++++++++++++++++++++
 ...nCallback.java => ScramExtensionsCallback.java} | 14 ++--
 .../common/security/scram/ScramLoginModule.java    |  6 +-
 .../kafka/common/security/scram/ScramMessages.java | 31 +++------
 .../common/security/scram/ScramSaslClient.java     |  8 +--
 .../common/security/scram/ScramSaslServer.java     | 43 +++++++-----
 .../security/scram/ScramServerCallbackHandler.java | 15 +++--
 .../DelegationTokenCredentialCallback.java}        | 17 +++--
 .../auth/DefaultKafkaPrincipalBuilderTest.java     |  2 -
 .../common/security/scram/ScramMessagesTest.java   |  6 +-
 15 files changed, 170 insertions(+), 113 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index 10bf76d..74bc9c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -47,16 +47,11 @@ public class KafkaPrincipal implements Principal {
 
     private final String principalType;
     private final String name;
-    private boolean tokenAuthenticated;
+    private volatile boolean tokenAuthenticated;
 
     public KafkaPrincipal(String principalType, String name) {
-       this(principalType, name, false);
-    }
-
-    public KafkaPrincipal(String principalType, String name, boolean tokenauth) {
         this.principalType = requireNonNull(principalType, "Principal type cannot be null");
         this.name = requireNonNull(name, "Principal name cannot be null");
-        this.tokenAuthenticated =  tokenauth;
     }
 
     /**
@@ -91,7 +86,6 @@ public class KafkaPrincipal implements Principal {
     public int hashCode() {
         int result = principalType != null ? principalType.hashCode() : 0;
         result = 31 * result + (name != null ? name.hashCode() : 0);
-        result = 31 * result + (tokenAuthenticated ? 1 : 0);
         return result;
     }
 
@@ -104,6 +98,10 @@ public class KafkaPrincipal implements Principal {
         return principalType;
     }
 
+    public void tokenAuthenticated(boolean tokenAuthenticated) {
+        this.tokenAuthenticated = tokenAuthenticated;
+    }
+
     public boolean tokenAuthenticated() {
         return tokenAuthenticated;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
index 7404238..30b0a3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
 import org.apache.kafka.common.security.auth.SslAuthenticationContext;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.scram.ScramLoginModule;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
@@ -119,13 +118,8 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
             SaslServer saslServer = ((SaslAuthenticationContext) context).server();
             if (SaslConfigs.GSSAPI_MECHANISM.equals(saslServer.getMechanismName()))
                 return applyKerberosShortNamer(saslServer.getAuthorizationID());
-            else {
-                Boolean isTokenAuthenticated = (Boolean) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG);
-                if (isTokenAuthenticated != null && isTokenAuthenticated)
-                    return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(), true);
-                else
-                    return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
-            }
+            else
+                return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
         } else {
             throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index de96cef..31c51c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -28,7 +28,7 @@ import javax.security.sasl.RealmCallback;
 
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.scram.DelegationTokenAuthenticationCallback;
+import org.apache.kafka.common.security.scram.ScramExtensionsCallback;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL mechanism
@@ -81,10 +81,10 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
                 ac.setAuthorized(authId.equals(authzId));
                 if (ac.isAuthorized())
                     ac.setAuthorizedID(authzId);
-            } else if (callback instanceof DelegationTokenAuthenticationCallback) {
-                DelegationTokenAuthenticationCallback tc = (DelegationTokenAuthenticationCallback) callback;
-                if (!isKerberos && subject != null && !subject.getPublicCredentials(Boolean.class).isEmpty()) {
-                    tc.tokenauth(subject.getPublicCredentials(Boolean.class).iterator().next());
+            } else if (callback instanceof ScramExtensionsCallback) {
+                ScramExtensionsCallback sc = (ScramExtensionsCallback) callback;
+                if (!isKerberos && subject != null && !subject.getPublicCredentials(Map.class).isEmpty()) {
+                    sc.extensions((Map<String, String>) subject.getPublicCredentials(Map.class).iterator().next());
                 }
             }  else {
                 throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index ca6e9d2..2a80e5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.apache.kafka.common.security.scram.ScramServerCallbackHandler;
 import org.apache.kafka.common.utils.Utils;
@@ -297,7 +298,11 @@ public class SaslServerAuthenticator implements Authenticator {
     @Override
     public KafkaPrincipal principal() {
         SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress());
-        return principalBuilder.build(context);
+        KafkaPrincipal principal = principalBuilder.build(context);
+        if (ScramMechanism.isScram(saslMechanism) && Boolean.parseBoolean((String) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) {
+            principal.tokenAuthenticated(true);
+        }
+        return principal;
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
index 7f3601c..931210a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
@@ -20,36 +20,12 @@ import javax.security.auth.callback.Callback;
 
 public class ScramCredentialCallback implements Callback {
     private ScramCredential scramCredential;
-    private boolean tokenAuthenticated;
-    private String tokenOwner;
-    private String mechanism;
-
-    public ScramCredentialCallback(boolean tokenAuthenticated, String mechanism) {
-        this.tokenAuthenticated = tokenAuthenticated;
-        this.mechanism = mechanism;
-    }
 
     public ScramCredential scramCredential() {
         return scramCredential;
     }
 
-    public boolean tokenauth() {
-        return tokenAuthenticated;
-    }
-
     public void scramCredential(ScramCredential scramCredential) {
         this.scramCredential = scramCredential;
     }
-
-    public void tokenOwner(String tokenOwner) {
-        this.tokenOwner = tokenOwner;
-    }
-
-    public String tokenOwner() {
-        return tokenOwner;
-    }
-
-    public String mechanism() {
-        return mechanism;
-    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java
new file mode 100644
index 0000000..0f461c0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.scram;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ScramExtensions {
+    private final Map<String, String> extensionMap;
+
+    public ScramExtensions() {
+        this(Collections.<String, String>emptyMap());
+    }
+
+    public ScramExtensions(String extensions) {
+        this(stringToMap(extensions));
+    }
+
+    public ScramExtensions(Map<String, String> extensionMap) {
+        this.extensionMap = extensionMap;
+    }
+
+    public String extensionValue(String name) {
+        return extensionMap.get(name);
+    }
+
+    public Set<String> extensionNames() {
+        return extensionMap.keySet();
+    }
+
+    public boolean tokenAuthenticated() {
+        return Boolean.parseBoolean(extensionMap.get(ScramLoginModule.TOKEN_AUTH_CONFIG));
+    }
+
+    @Override
+    public String toString() {
+        return mapToString(extensionMap);
+    }
+
+    private static Map<String, String> stringToMap(String extensions) {
+        Map<String, String> extensionMap = new HashMap<>();
+
+        if (!extensions.isEmpty()) {
+            String[] attrvals = extensions.split(",");
+            for (String attrval : attrvals) {
+                String[] array = attrval.split("=", 2);
+                extensionMap.put(array[0], array[1]);
+            }
+        }
+        return extensionMap;
+    }
+
+    private static String mapToString(Map<String, String> extensionMap) {
+        StringBuilder builder = new StringBuilder();
+        for (Map.Entry<String, String> entry : extensionMap.entrySet()) {
+            builder.append(entry.getKey());
+            builder.append('=');
+            builder.append(entry.getValue());
+        }
+        return builder.toString();
+    }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
similarity index 72%
copy from clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
copy to clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
index df6e849..b40468b 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
@@ -18,15 +18,17 @@
 package org.apache.kafka.common.security.scram;
 
 import javax.security.auth.callback.Callback;
+import java.util.Collections;
+import java.util.Map;
 
-public class DelegationTokenAuthenticationCallback implements Callback {
-    private boolean tokenauth;
+public class ScramExtensionsCallback implements Callback {
+    private Map<String, String> extensions = Collections.emptyMap();
 
-    public String extension() {
-        return ScramLoginModule.TOKEN_AUTH_CONFIG + "=" +  Boolean.toString(tokenauth);
+    public Map<String, String> extensions() {
+        return extensions;
     }
 
-    public void tokenauth(Boolean tokenauth) {
-        this.tokenauth = tokenauth;
+    public void extensions(Map<String, String> extensions) {
+        this.extensions = extensions;
     }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
index 8000f4c..43df515 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.security.scram;
 
+import java.util.Collections;
 import java.util.Map;
 
 import javax.security.auth.Subject;
@@ -44,7 +45,10 @@ public class ScramLoginModule implements LoginModule {
             subject.getPrivateCredentials().add(password);
 
         Boolean useTokenAuthentication = "true".equalsIgnoreCase((String) options.get(TOKEN_AUTH_CONFIG));
-        subject.getPublicCredentials().add(useTokenAuthentication);
+        if (useTokenAuthentication) {
+            Map<String, String> scramExtensions = Collections.singletonMap(TOKEN_AUTH_CONFIG, "true");
+            subject.getPublicCredentials().add(scramExtensions);
+        }
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
index e697ea5..05b3d77 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.security.scram;
 import org.apache.kafka.common.utils.Base64;
 
 import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -77,7 +76,7 @@ public class ScramMessages {
         private final String saslName;
         private final String nonce;
         private final String authorizationId;
-        private final String extensions;
+        private final ScramExtensions extensions;
         public ClientFirstMessage(byte[] messageBytes) throws SaslException {
             String message = toMessage(messageBytes);
             Matcher matcher = PATTERN.matcher(message);
@@ -88,12 +87,13 @@ public class ScramMessages {
             this.saslName = matcher.group("saslname");
             this.nonce = matcher.group("nonce");
             String extString = matcher.group("extensions");
-            this.extensions = extString.startsWith(",") ? extString.substring(1) : extString;
+
+            this.extensions = extString.startsWith(",") ? new ScramExtensions(extString.substring(1)) : new ScramExtensions();
         }
-        public ClientFirstMessage(String saslName, String nonce, String extensions) {
+        public ClientFirstMessage(String saslName, String nonce, Map<String, String> extensions) {
             this.saslName = saslName;
             this.nonce = nonce;
-            this.extensions = extensions;
+            this.extensions = new ScramExtensions(extensions);
             this.authorizationId = ""; // Optional authzid not specified in gs2-header
         }
         public String saslName() {
@@ -108,29 +108,16 @@ public class ScramMessages {
         public String gs2Header() {
             return "n," + authorizationId + ",";
         }
-        public String extensions() {
+        public ScramExtensions extensions() {
             return extensions;
         }
 
-        public Map<String, String> extensionsAsMap() {
-            Map<String, String> map = new HashMap<>();
-
-            if (extensions.isEmpty())
-                return map;
-
-            String[] attrvals = extensions.split(",");
-            for (String attrval: attrvals) {
-                String[] array = attrval.split("=", 2);
-                map.put(array[0], array[1]);
-            }
-            return map;
-        }
-
         public String clientFirstMessageBare() {
-            if (extensions.isEmpty())
+            String extensionStr = extensions.toString();
+            if (extensionStr.isEmpty())
                 return String.format("n=%s,r=%s", saslName, nonce);
             else
-                return String.format("n=%s,r=%s,%s", saslName, nonce, extensions);
+                return String.format("n=%s,r=%s,%s", saslName, nonce, extensionStr);
         }
         String toMessage() {
             return gs2Header() + clientFirstMessageBare();
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
index 6b66f5e..71109df 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
@@ -97,18 +97,18 @@ public class ScramSaslClient implements SaslClient {
                         throw new SaslException("Expected empty challenge");
                     clientNonce = formatter.secureRandomString();
                     NameCallback nameCallback = new NameCallback("Name:");
-                    DelegationTokenAuthenticationCallback tokenAuthCallback = new DelegationTokenAuthenticationCallback();
+                    ScramExtensionsCallback extensionsCallback = new ScramExtensionsCallback();
 
                     try {
-                        callbackHandler.handle(new Callback[]{nameCallback, tokenAuthCallback});
+                        callbackHandler.handle(new Callback[]{nameCallback, extensionsCallback});
                     } catch (IOException | UnsupportedCallbackException e) {
                         throw new SaslException("User name could not be obtained", e);
                     }
 
                     String username = nameCallback.getName();
                     String saslName = formatter.saslName(username);
-                    String extension = tokenAuthCallback.extension();
-                    this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extension);
+                    Map<String, String> extensions = extensionsCallback.extensions();
+                    this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extensions);
                     setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
                     return clientFirstMessage.toBytes();
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
index 94b92b6..314c1d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
@@ -22,6 +22,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -37,6 +38,8 @@ import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
 import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
 import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
 import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory;
 public class ScramSaslServer implements SaslServer {
 
     private static final Logger log = LoggerFactory.getLogger(ScramSaslServer.class);
+    private static final Set<String> SUPPORTED_EXTENSIONS = Utils.mkSet(ScramLoginModule.TOKEN_AUTH_CONFIG);
 
     enum State {
         RECEIVE_CLIENT_FIRST_MESSAGE,
@@ -65,9 +69,9 @@ public class ScramSaslServer implements SaslServer {
     private String username;
     private ClientFirstMessage clientFirstMessage;
     private ServerFirstMessage serverFirstMessage;
+    private ScramExtensions scramExtensions;
     private ScramCredential scramCredential;
-    private boolean tokenAuthentication;
-    private String tokenOwner;
+    private String authorizationId;
 
     public ScramSaslServer(ScramMechanism mechanism, Map<String, ?> props, CallbackHandler callbackHandler) throws NoSuchAlgorithmException {
         this.mechanism = mechanism;
@@ -91,18 +95,29 @@ public class ScramSaslServer implements SaslServer {
             switch (state) {
                 case RECEIVE_CLIENT_FIRST_MESSAGE:
                     this.clientFirstMessage = new ClientFirstMessage(response);
+                    this.scramExtensions = clientFirstMessage.extensions();
+                    if (!SUPPORTED_EXTENSIONS.containsAll(scramExtensions.extensionNames())) {
+                        log.debug("Unsupported extensions will be ignored, supported {}, provided {}",
+                                SUPPORTED_EXTENSIONS, scramExtensions.extensionNames());
+                    }
                     String serverNonce = formatter.secureRandomString();
                     try {
                         String saslName = clientFirstMessage.saslName();
                         this.username = formatter.username(saslName);
-                        Map<String, String> extensions = clientFirstMessage.extensionsAsMap();
-                        this.tokenAuthentication = "true".equalsIgnoreCase(extensions.get(ScramLoginModule.TOKEN_AUTH_CONFIG));
                         NameCallback nameCallback = new NameCallback("username", username);
-                        ScramCredentialCallback credentialCallback = new ScramCredentialCallback(tokenAuthentication, getMechanismName());
-                        callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
-                        this.tokenOwner = credentialCallback.tokenOwner();
-                        if (tokenAuthentication && tokenOwner == null)
-                            throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
+                        ScramCredentialCallback credentialCallback;
+                        if (scramExtensions.tokenAuthenticated()) {
+                            DelegationTokenCredentialCallback tokenCallback = new DelegationTokenCredentialCallback();
+                            credentialCallback = tokenCallback;
+                            callbackHandler.handle(new Callback[]{nameCallback, tokenCallback});
+                            if (tokenCallback.tokenOwner() == null)
+                                throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
+                            this.authorizationId = tokenCallback.tokenOwner();
+                        } else {
+                            credentialCallback = new ScramCredentialCallback();
+                            callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
+                            this.authorizationId = username;
+                        }
                         this.scramCredential = credentialCallback.scramCredential();
                         if (scramCredential == null)
                             throw new SaslException("Authentication failed: Invalid user credentials");
@@ -150,11 +165,7 @@ public class ScramSaslServer implements SaslServer {
     public String getAuthorizationID() {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
-
-        if (tokenAuthentication)
-            return tokenOwner; // return token owner as principal for this session
-
-        return username;
+        return authorizationId;
     }
 
     @Override
@@ -167,8 +178,8 @@ public class ScramSaslServer implements SaslServer {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
 
-        if (ScramLoginModule.TOKEN_AUTH_CONFIG.equals(propName))
-            return tokenAuthentication;
+        if (SUPPORTED_EXTENSIONS.contains(propName))
+            return scramExtensions.extensionValue(propName);
         else
             return null;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
index a064e8a..5e37eae 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
@@ -28,11 +28,13 @@ import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.security.authenticator.AuthCallbackHandler;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
 
 public class ScramServerCallbackHandler implements AuthCallbackHandler {
 
     private final CredentialCache.Cache<ScramCredential> credentialCache;
     private final DelegationTokenCache tokenCache;
+    private String saslMechanism;
 
     public ScramServerCallbackHandler(CredentialCache.Cache<ScramCredential> credentialCache,
                                       DelegationTokenCache tokenCache) {
@@ -46,13 +48,13 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
         for (Callback callback : callbacks) {
             if (callback instanceof NameCallback)
                 username = ((NameCallback) callback).getDefaultName();
-            else if (callback instanceof ScramCredentialCallback) {
+            else if (callback instanceof DelegationTokenCredentialCallback) {
+                DelegationTokenCredentialCallback tokenCallback = (DelegationTokenCredentialCallback) callback;
+                tokenCallback.scramCredential(tokenCache.credential(saslMechanism, username));
+                tokenCallback.tokenOwner(tokenCache.owner(username));
+            } else if (callback instanceof ScramCredentialCallback) {
                 ScramCredentialCallback sc = (ScramCredentialCallback) callback;
-                if (sc.tokenauth()) {
-                    sc.scramCredential(tokenCache.credential(sc.mechanism(), username));
-                    sc.tokenOwner(tokenCache.owner(username));
-                } else
-                    sc.scramCredential(credentialCache.get(username));
+                sc.scramCredential(credentialCache.get(username));
             } else
                 throw new UnsupportedCallbackException(callback);
         }
@@ -60,6 +62,7 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
 
     @Override
     public void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism) {
+        this.saslMechanism = saslMechanism;
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
similarity index 66%
rename from clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
index df6e849..7490a3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
@@ -14,19 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.security.token.delegation;
 
-package org.apache.kafka.common.security.scram;
+import org.apache.kafka.common.security.scram.ScramCredentialCallback;
 
-import javax.security.auth.callback.Callback;
+public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
+    private String tokenOwner;
 
-public class DelegationTokenAuthenticationCallback implements Callback {
-    private boolean tokenauth;
-
-    public String extension() {
-        return ScramLoginModule.TOKEN_AUTH_CONFIG + "=" +  Boolean.toString(tokenauth);
+    public void tokenOwner(String tokenOwner) {
+        this.tokenOwner = tokenOwner;
     }
 
-    public void tokenauth(Boolean tokenauth) {
-        this.tokenauth = tokenauth;
+    public String tokenOwner() {
+        return tokenOwner;
     }
 }
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index 787f5a7..a30c09f 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -119,7 +118,6 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
 
         EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
         EasyMock.expect(server.getAuthorizationID()).andReturn("foo");
-        EasyMock.expect(server.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG)).andReturn(false);
 
         replayAll();
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
index 58be6e1..7b04ede 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
@@ -21,12 +21,14 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 
 import javax.security.sasl.SaslException;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.kafka.common.security.scram.ScramMessages.AbstractScramMessage;
@@ -70,7 +72,7 @@ public class ScramMessagesTest {
     @Test
     public void validClientFirstMessage() throws SaslException {
         String nonce = formatter.secureRandomString();
-        ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, "");
+        ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, Collections.<String, String>emptyMap());
         checkClientFirstMessage(m, "someuser", nonce, "");
 
         // Default format used by Kafka client: only user and nonce are specified
@@ -111,7 +113,7 @@ public class ScramMessagesTest {
         //optional tokenauth specified as extensions
         str = String.format("n,,n=testuser,r=%s,%s", nonce, "tokenauth=true");
         m = createScramMessage(ClientFirstMessage.class, str);
-        assertEquals("tokenauth=true", m.extensions());
+        assertTrue("Token authentication not set from extensions", m.extensions().tokenAuthenticated());
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.