You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/10/08 09:14:37 UTC

[kafka] branch trunk updated: KAFKA-7462: Make token optional for OAuthBearerLoginModule (#5733)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c26823b  KAFKA-7462: Make token optional for OAuthBearerLoginModule (#5733)
c26823b is described below

commit c26823b01e61f301bc2fbd3fe54665e5b92284d1
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Oct 8 10:14:25 2018 +0100

    KAFKA-7462: Make token optional for OAuthBearerLoginModule (#5733)
    
    OAuthBearerLoginModule is used both on the server-side and client-side (similar to login modules for other mechanisms). OAUTHBEARER tokens are client credentials used only on the client-side to authenticate with servers, but the current implementation requires tokens to be provided on the server-side even if OAUTHBEARER is not used for inter-broker communication. This commit makes tokens optional for server-side login context to allow brokers to be configured without a token when OAUT [...]
    
    Reviewers: Ron Dagostino <rn...@gmail.com>, Jun Rao <ju...@gmail.com>
---
 .../oauthbearer/OAuthBearerLoginModule.java        | 107 ++++++++++++++-------
 .../oauthbearer/OAuthBearerTokenCallback.java      |   4 +-
 .../OAuthBearerUnsecuredLoginCallbackHandler.java  |   8 ++
 .../authenticator/SaslAuthenticatorTest.java       |  35 +++++++
 4 files changed, 119 insertions(+), 35 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index 1dcd199..e3a7810 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -236,6 +236,21 @@ import org.slf4j.LoggerFactory;
  * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
  */
 public class OAuthBearerLoginModule implements LoginModule {
+
+    /**
+     * Login state transitions:
+     *   Initial state: NOT_LOGGED_IN
+     *   login()      : NOT_LOGGED_IN => LOGGED_IN_NOT_COMMITTED
+     *   commit()     : LOGGED_IN_NOT_COMMITTED => COMMITTED
+     *   abort()      : LOGGED_IN_NOT_COMMITTED => NOT_LOGGED_IN
+     *   logout()     : Any state => NOT_LOGGED_IN
+     */
+    private enum LoginState {
+        NOT_LOGGED_IN,
+        LOGGED_IN_NOT_COMMITTED,
+        COMMITTED
+    }
+
     /**
      * The SASL Mechanism name for OAuth 2: {@code OAUTHBEARER}
      */
@@ -248,6 +263,7 @@ public class OAuthBearerLoginModule implements LoginModule {
     private OAuthBearerToken myCommittedToken = null;
     private SaslExtensions extensionsRequiringCommit = null;
     private SaslExtensions myCommittedExtensions = null;
+    private LoginState loginState;
 
     static {
         OAuthBearerSaslClientProvider.initialize(); // not part of public API
@@ -266,17 +282,29 @@ public class OAuthBearerLoginModule implements LoginModule {
 
     @Override
     public boolean login() throws LoginException {
-        if (tokenRequiringCommit != null)
-            throw new IllegalStateException(String.format(
+        if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED) {
+            if (tokenRequiringCommit != null)
+                throw new IllegalStateException(String.format(
                     "Already have an uncommitted token with private credential token count=%d", committedTokenCount()));
-        if (myCommittedToken != null)
-            throw new IllegalStateException(String.format(
+            else
+                throw new IllegalStateException("Already logged in without a token");
+        }
+        if (loginState == LoginState.COMMITTED) {
+            if (myCommittedToken != null)
+                throw new IllegalStateException(String.format(
                     "Already have a committed token with private credential token count=%d; must login on another login context or logout here first before reusing the same login context",
                     committedTokenCount()));
+            else
+                throw new IllegalStateException("Login has already been committed without a token");
+        }
 
         identifyToken();
-        identifyExtensions();
+        if (tokenRequiringCommit != null)
+            identifyExtensions();
+        else
+            log.debug("Logged in without a token, this login cannot be used to establish client connections");
 
+        loginState = LoginState.LOGGED_IN_NOT_COMMITTED;
         log.info("Login succeeded; invoke commit() to commit it; current committed token count={}",
                 committedTokenCount());
         return true;
@@ -292,7 +320,7 @@ public class OAuthBearerLoginModule implements LoginModule {
         }
 
         tokenRequiringCommit = tokenCallback.token();
-        if (tokenRequiringCommit == null) {
+        if (tokenCallback.errorCode() != null) {
             log.info("Login failed: {} : {} (URI={})", tokenCallback.errorCode(), tokenCallback.errorDescription(),
                     tokenCallback.errorUri());
             throw new LoginException(tokenCallback.errorDescription());
@@ -322,64 +350,77 @@ public class OAuthBearerLoginModule implements LoginModule {
 
     @Override
     public boolean logout() {
-        if (tokenRequiringCommit != null)
+        if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED)
             throw new IllegalStateException(
                     "Cannot call logout() immediately after login(); need to first invoke commit() or abort()");
-        if (myCommittedToken == null) {
+        if (loginState != LoginState.COMMITTED) {
             if (log.isDebugEnabled())
                 log.debug("Nothing here to log out");
             return false;
         }
-        log.info("Logging out my token; current committed token count = {}", committedTokenCount());
-        for (Iterator<Object> iterator = subject.getPrivateCredentials().iterator(); iterator.hasNext();) {
-            Object privateCredential = iterator.next();
-            if (privateCredential == myCommittedToken) {
-                iterator.remove();
-                myCommittedToken = null;
-                break;
+        if (myCommittedToken != null) {
+            log.info("Logging out my token; current committed token count = {}", committedTokenCount());
+            for (Iterator<Object> iterator = subject.getPrivateCredentials().iterator(); iterator.hasNext(); ) {
+                Object privateCredential = iterator.next();
+                if (privateCredential == myCommittedToken) {
+                    iterator.remove();
+                    myCommittedToken = null;
+                    break;
+                }
             }
-        }
-        log.info("Done logging out my token; committed token count is now {}", committedTokenCount());
+            log.info("Done logging out my token; committed token count is now {}", committedTokenCount());
+        } else
+            log.debug("No tokens to logout for this login");
 
-        log.info("Logging out my extensions");
-        if (subject.getPublicCredentials().removeIf(e -> myCommittedExtensions == e))
-            myCommittedExtensions = null;
-        log.info("Done logging out my extensions");
+        if (myCommittedExtensions != null) {
+            log.info("Logging out my extensions");
+            if (subject.getPublicCredentials().removeIf(e -> myCommittedExtensions == e))
+                myCommittedExtensions = null;
+            log.info("Done logging out my extensions");
+        } else
+            log.debug("No extensions to logout for this login");
 
+        loginState = LoginState.NOT_LOGGED_IN;
         return true;
     }
 
     @Override
     public boolean commit() {
-        if (tokenRequiringCommit == null) {
+        if (loginState != LoginState.LOGGED_IN_NOT_COMMITTED) {
             if (log.isDebugEnabled())
                 log.debug("Nothing here to commit");
             return false;
         }
 
-        log.info("Committing my token; current committed token count = {}", committedTokenCount());
-        subject.getPrivateCredentials().add(tokenRequiringCommit);
-        myCommittedToken = tokenRequiringCommit;
-        tokenRequiringCommit = null;
-        log.info("Done committing my token; committed token count is now {}", committedTokenCount());
+        if (tokenRequiringCommit != null) {
+            log.info("Committing my token; current committed token count = {}", committedTokenCount());
+            subject.getPrivateCredentials().add(tokenRequiringCommit);
+            myCommittedToken = tokenRequiringCommit;
+            tokenRequiringCommit = null;
+            log.info("Done committing my token; committed token count is now {}", committedTokenCount());
+        } else
+            log.debug("No tokens to commit, this login cannot be used to establish client connections");
 
-        subject.getPublicCredentials().add(extensionsRequiringCommit);
-        myCommittedExtensions = extensionsRequiringCommit;
-        extensionsRequiringCommit = null;
+        if (extensionsRequiringCommit != null) {
+            subject.getPublicCredentials().add(extensionsRequiringCommit);
+            myCommittedExtensions = extensionsRequiringCommit;
+            extensionsRequiringCommit = null;
+        }
 
+        loginState = LoginState.COMMITTED;
         return true;
     }
 
     @Override
     public boolean abort() {
-        if (tokenRequiringCommit != null) {
+        if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED) {
             log.info("Login aborted");
             tokenRequiringCommit = null;
             extensionsRequiringCommit = null;
+            loginState = LoginState.NOT_LOGGED_IN;
             return true;
         }
-        if (log.isDebugEnabled())
-            log.debug("Nothing here to abort");
+        log.debug("Nothing here to abort");
         return false;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java
index 62ce492..3f4f269 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java
@@ -90,10 +90,10 @@ public class OAuthBearerTokenCallback implements Callback {
      * Set the token. All error-related values are cleared.
      * 
      * @param token
-     *            the mandatory token to set
+     *            the optional token to set
      */
     public void token(OAuthBearerToken token) {
-        this.token = Objects.requireNonNull(token);
+        this.token = token;
         this.errorCode = null;
         this.errorDescription = null;
         this.errorUri = null;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index 8d259e3..e7a4f2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -182,6 +182,14 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
     private void handleTokenCallback(OAuthBearerTokenCallback callback) {
         if (callback.token() != null)
             throw new IllegalArgumentException("Callback had a token already");
+        if (moduleOptions.isEmpty()) {
+            log.debug("Token not provided, this login cannot be used to establish client connections");
+            callback.token(null);
+            return;
+        }
+        if (moduleOptions.keySet().stream().noneMatch(name -> !name.startsWith(EXTENSION_PREFIX))) {
+            throw new OAuthBearerConfigException("Extensions provided in login context without a token");
+        }
         String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION);
         String principalClaimName = principalClaimNameValue != null && !principalClaimNameValue.trim().isEmpty()
                 ? principalClaimNameValue.trim()
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index dfefabb..297cba5 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -93,6 +93,7 @@ import org.apache.kafka.common.security.authenticator.TestDigestLoginModule.Dige
 import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
 
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -1208,6 +1209,40 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests OAUTHBEARER client channels without tokens for the server.
+     */
+    @Test
+    public void testValidSaslOauthBearerMechanismWithoutServerTokens() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+        saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("OAUTHBEARER"));
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.singletonMap("unsecuredLoginStringClaim_sub", TestJaasConfig.USERNAME)));
+        saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer." + SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.emptyMap()));
+
+        // Server without a token should start up successfully and authenticate clients.
+        server = createEchoServer(securityProtocol);
+        createAndCheckClientConnection(securityProtocol, node);
+
+        // Client without a token should fail to connect
+        saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.emptyMap()));
+        createAndCheckClientConnectionFailure(securityProtocol, node);
+
+        // Server with extensions, but without a token should fail to start up since it could indicate a configuration error
+        saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer." + SaslConfigs.SASL_JAAS_CONFIG,
+                TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.singletonMap("unsecuredLoginExtension_test", "something")));
+        try {
+            createEchoServer(securityProtocol);
+            fail("Server created with invalid login config containing extensions without a token");
+        } catch (Throwable e) {
+            assertTrue("Unexpected exception " + Utils.stackTrace(e), e.getCause() instanceof LoginException);
+        }
+    }
+
+    /**
      * Tests OAUTHBEARER fails the connection when the client presents a token with
      * insufficient scope .
      */