You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/10/08 09:14:37 UTC
[kafka] branch trunk updated: KAFKA-7462: Make token optional for
OAuthBearerLoginModule (#5733)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c26823b KAFKA-7462: Make token optional for OAuthBearerLoginModule (#5733)
c26823b is described below
commit c26823b01e61f301bc2fbd3fe54665e5b92284d1
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Oct 8 10:14:25 2018 +0100
KAFKA-7462: Make token optional for OAuthBearerLoginModule (#5733)
OAuthBearerLoginModule is used both on the server-side and client-side (similar to login modules for other mechanisms). OAUTHBEARER tokens are client credentials used only on the client-side to authenticate with servers, but the current implementation requires tokens to be provided on the server-side even if OAUTHBEARER is not used for inter-broker communication. This commit makes tokens optional for server-side login context to allow brokers to be configured without a token when OAUT [...]
Reviewers: Ron Dagostino <rn...@gmail.com>, Jun Rao <ju...@gmail.com>
---
.../oauthbearer/OAuthBearerLoginModule.java | 107 ++++++++++++++-------
.../oauthbearer/OAuthBearerTokenCallback.java | 4 +-
.../OAuthBearerUnsecuredLoginCallbackHandler.java | 8 ++
.../authenticator/SaslAuthenticatorTest.java | 35 +++++++
4 files changed, 119 insertions(+), 35 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index 1dcd199..e3a7810 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -236,6 +236,21 @@ import org.slf4j.LoggerFactory;
* @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
*/
public class OAuthBearerLoginModule implements LoginModule {
+
+ /**
+ * Login state transitions:
+ * Initial state: NOT_LOGGED_IN
+ * login() : NOT_LOGGED_IN => LOGGED_IN_NOT_COMMITTED
+ * commit() : LOGGED_IN_NOT_COMMITTED => COMMITTED
+ * abort() : LOGGED_IN_NOT_COMMITTED => NOT_LOGGED_IN
+ * logout() : Any state => NOT_LOGGED_IN
+ */
+ private enum LoginState {
+ NOT_LOGGED_IN,
+ LOGGED_IN_NOT_COMMITTED,
+ COMMITTED
+ }
+
/**
* The SASL Mechanism name for OAuth 2: {@code OAUTHBEARER}
*/
@@ -248,6 +263,7 @@ public class OAuthBearerLoginModule implements LoginModule {
private OAuthBearerToken myCommittedToken = null;
private SaslExtensions extensionsRequiringCommit = null;
private SaslExtensions myCommittedExtensions = null;
+ private LoginState loginState;
static {
OAuthBearerSaslClientProvider.initialize(); // not part of public API
@@ -266,17 +282,29 @@ public class OAuthBearerLoginModule implements LoginModule {
@Override
public boolean login() throws LoginException {
- if (tokenRequiringCommit != null)
- throw new IllegalStateException(String.format(
+ if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED) {
+ if (tokenRequiringCommit != null)
+ throw new IllegalStateException(String.format(
"Already have an uncommitted token with private credential token count=%d", committedTokenCount()));
- if (myCommittedToken != null)
- throw new IllegalStateException(String.format(
+ else
+ throw new IllegalStateException("Already logged in without a token");
+ }
+ if (loginState == LoginState.COMMITTED) {
+ if (myCommittedToken != null)
+ throw new IllegalStateException(String.format(
"Already have a committed token with private credential token count=%d; must login on another login context or logout here first before reusing the same login context",
committedTokenCount()));
+ else
+ throw new IllegalStateException("Login has already been committed without a token");
+ }
identifyToken();
- identifyExtensions();
+ if (tokenRequiringCommit != null)
+ identifyExtensions();
+ else
+ log.debug("Logged in without a token, this login cannot be used to establish client connections");
+ loginState = LoginState.LOGGED_IN_NOT_COMMITTED;
log.info("Login succeeded; invoke commit() to commit it; current committed token count={}",
committedTokenCount());
return true;
@@ -292,7 +320,7 @@ public class OAuthBearerLoginModule implements LoginModule {
}
tokenRequiringCommit = tokenCallback.token();
- if (tokenRequiringCommit == null) {
+ if (tokenCallback.errorCode() != null) {
log.info("Login failed: {} : {} (URI={})", tokenCallback.errorCode(), tokenCallback.errorDescription(),
tokenCallback.errorUri());
throw new LoginException(tokenCallback.errorDescription());
@@ -322,64 +350,77 @@ public class OAuthBearerLoginModule implements LoginModule {
@Override
public boolean logout() {
- if (tokenRequiringCommit != null)
+ if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED)
throw new IllegalStateException(
"Cannot call logout() immediately after login(); need to first invoke commit() or abort()");
- if (myCommittedToken == null) {
+ if (loginState != LoginState.COMMITTED) {
if (log.isDebugEnabled())
log.debug("Nothing here to log out");
return false;
}
- log.info("Logging out my token; current committed token count = {}", committedTokenCount());
- for (Iterator<Object> iterator = subject.getPrivateCredentials().iterator(); iterator.hasNext();) {
- Object privateCredential = iterator.next();
- if (privateCredential == myCommittedToken) {
- iterator.remove();
- myCommittedToken = null;
- break;
+ if (myCommittedToken != null) {
+ log.info("Logging out my token; current committed token count = {}", committedTokenCount());
+ for (Iterator<Object> iterator = subject.getPrivateCredentials().iterator(); iterator.hasNext(); ) {
+ Object privateCredential = iterator.next();
+ if (privateCredential == myCommittedToken) {
+ iterator.remove();
+ myCommittedToken = null;
+ break;
+ }
}
- }
- log.info("Done logging out my token; committed token count is now {}", committedTokenCount());
+ log.info("Done logging out my token; committed token count is now {}", committedTokenCount());
+ } else
+ log.debug("No tokens to logout for this login");
- log.info("Logging out my extensions");
- if (subject.getPublicCredentials().removeIf(e -> myCommittedExtensions == e))
- myCommittedExtensions = null;
- log.info("Done logging out my extensions");
+ if (myCommittedExtensions != null) {
+ log.info("Logging out my extensions");
+ if (subject.getPublicCredentials().removeIf(e -> myCommittedExtensions == e))
+ myCommittedExtensions = null;
+ log.info("Done logging out my extensions");
+ } else
+ log.debug("No extensions to logout for this login");
+ loginState = LoginState.NOT_LOGGED_IN;
return true;
}
@Override
public boolean commit() {
- if (tokenRequiringCommit == null) {
+ if (loginState != LoginState.LOGGED_IN_NOT_COMMITTED) {
if (log.isDebugEnabled())
log.debug("Nothing here to commit");
return false;
}
- log.info("Committing my token; current committed token count = {}", committedTokenCount());
- subject.getPrivateCredentials().add(tokenRequiringCommit);
- myCommittedToken = tokenRequiringCommit;
- tokenRequiringCommit = null;
- log.info("Done committing my token; committed token count is now {}", committedTokenCount());
+ if (tokenRequiringCommit != null) {
+ log.info("Committing my token; current committed token count = {}", committedTokenCount());
+ subject.getPrivateCredentials().add(tokenRequiringCommit);
+ myCommittedToken = tokenRequiringCommit;
+ tokenRequiringCommit = null;
+ log.info("Done committing my token; committed token count is now {}", committedTokenCount());
+ } else
+ log.debug("No tokens to commit, this login cannot be used to establish client connections");
- subject.getPublicCredentials().add(extensionsRequiringCommit);
- myCommittedExtensions = extensionsRequiringCommit;
- extensionsRequiringCommit = null;
+ if (extensionsRequiringCommit != null) {
+ subject.getPublicCredentials().add(extensionsRequiringCommit);
+ myCommittedExtensions = extensionsRequiringCommit;
+ extensionsRequiringCommit = null;
+ }
+ loginState = LoginState.COMMITTED;
return true;
}
@Override
public boolean abort() {
- if (tokenRequiringCommit != null) {
+ if (loginState == LoginState.LOGGED_IN_NOT_COMMITTED) {
log.info("Login aborted");
tokenRequiringCommit = null;
extensionsRequiringCommit = null;
+ loginState = LoginState.NOT_LOGGED_IN;
return true;
}
- if (log.isDebugEnabled())
- log.debug("Nothing here to abort");
+ log.debug("Nothing here to abort");
return false;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java
index 62ce492..3f4f269 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerTokenCallback.java
@@ -90,10 +90,10 @@ public class OAuthBearerTokenCallback implements Callback {
* Set the token. All error-related values are cleared.
*
* @param token
- * the mandatory token to set
+ * the optional token to set
*/
public void token(OAuthBearerToken token) {
- this.token = Objects.requireNonNull(token);
+ this.token = token;
this.errorCode = null;
this.errorDescription = null;
this.errorUri = null;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index 8d259e3..e7a4f2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -182,6 +182,14 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
private void handleTokenCallback(OAuthBearerTokenCallback callback) {
if (callback.token() != null)
throw new IllegalArgumentException("Callback had a token already");
+ if (moduleOptions.isEmpty()) {
+ log.debug("Token not provided, this login cannot be used to establish client connections");
+ callback.token(null);
+ return;
+ }
+ if (moduleOptions.keySet().stream().noneMatch(name -> !name.startsWith(EXTENSION_PREFIX))) {
+ throw new OAuthBearerConfigException("Extensions provided in login context without a token");
+ }
String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION);
String principalClaimName = principalClaimNameValue != null && !principalClaimNameValue.trim().isEmpty()
? principalClaimNameValue.trim()
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index dfefabb..297cba5 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -93,6 +93,7 @@ import org.apache.kafka.common.security.authenticator.TestDigestLoginModule.Dige
import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -1208,6 +1209,40 @@ public class SaslAuthenticatorTest {
}
/**
+ * Tests OAUTHBEARER client channels without tokens for the server.
+ */
+ @Test
+ public void testValidSaslOauthBearerMechanismWithoutServerTokens() throws Exception {
+ String node = "0";
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("OAUTHBEARER"));
+ saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+ TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.singletonMap("unsecuredLoginStringClaim_sub", TestJaasConfig.USERNAME)));
+ saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer." + SaslConfigs.SASL_JAAS_CONFIG,
+ TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.emptyMap()));
+
+ // Server without a token should start up successfully and authenticate clients.
+ server = createEchoServer(securityProtocol);
+ createAndCheckClientConnection(securityProtocol, node);
+
+ // Client without a token should fail to connect
+ saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
+ TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.emptyMap()));
+ createAndCheckClientConnectionFailure(securityProtocol, node);
+
+ // Server with extensions, but without a token should fail to start up since it could indicate a configuration error
+ saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer." + SaslConfigs.SASL_JAAS_CONFIG,
+ TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.singletonMap("unsecuredLoginExtension_test", "something")));
+ try {
+ createEchoServer(securityProtocol);
+ fail("Server created with invalid login config containing extensions without a token");
+ } catch (Throwable e) {
+ assertTrue("Unexpected exception " + Utils.stackTrace(e), e.getCause() instanceof LoginException);
+ }
+ }
+
+ /**
* Tests OAUTHBEARER fails the connection when the client presents a token with
* insufficient scope .
*/