You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/04/19 01:25:10 UTC
[pulsar] branch master updated: [fix][broker] Implement authenticateAsync for AuthenticationProviderList (#20132)
This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 58ccf020fc0 [fix][broker] Implement authenticateAsync for AuthenticationProviderList (#20132)
58ccf020fc0 is described below
commit 58ccf020fc0fa5f7d9b70032ae60c8bf9a3f234f
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue Apr 18 20:24:58 2023 -0500
[fix][broker] Implement authenticateAsync for AuthenticationProviderList (#20132)
PIP: #12105 and #19771
### Motivation
With the implementation of asynchronous authentication in PIP 97, I missed a case in the `AuthenticationProviderList` where we need to implement the `authenticateAsync` methods. This PR is necessary for making the `AuthenticationProviderToken` and the `AuthenticationProviderOpenID` work together, which is necessary for anyone transitioning to `AuthenticationProviderOpenID`.
### Modifications
* Implement `AuthenticationListState#authenticateAsync` using a recursive algorithm that first attempts to authenticate the client using the current `authState` and then tries the remaining options.
* Implement `AuthenticationProviderList#authenticateAsync` using a recursive algorithm that attempts each provider sequentially.
* Add test to `AuthenticationProviderListTest` that exercises this method. It didn't technically fail previously, but it's worth adding.
* Add test to `AuthenticationProviderOpenIDIntegrationTest` to cover the exact failures that were causing problems.
---
...uthenticationProviderOpenIDIntegrationTest.java | 57 +++++++++++++
.../authentication/AuthenticationProviderList.java | 95 +++++++++++++++++++++-
.../AuthenticationProviderListTest.java | 24 ++++++
3 files changed, 175 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
index 298492652c0..0075d70f599 100644
--- a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
+++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.fail;
import com.github.tomakehurst.wiremock.WireMockServer;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.impl.DefaultJwtBuilder;
+import io.jsonwebtoken.io.Decoders;
import io.jsonwebtoken.security.Keys;
import java.io.IOException;
import java.nio.file.Files;
@@ -41,13 +42,19 @@ import java.security.interfaces.RSAPublicKey;
import java.util.Base64;
import java.util.Date;
import java.util.HashMap;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.naming.AuthenticationException;
+import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.common.api.AuthData;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -438,6 +445,56 @@ public class AuthenticationProviderOpenIDIntegrationTest {
assertTrue(state.isExpired());
}
+ /**
+ * This test covers the migration scenario where you have both the Token and OpenID providers. It ensures
+ * both kinds of authentication work.
+ * @throws Exception
+ */
+ @Test
+ public void testAuthenticationProviderListStateSuccess() throws Exception {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName(),
+ AuthenticationProviderToken.class.getName()));
+ Properties props = conf.getProperties();
+ props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+ props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+ props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
+
+ // Set up static token
+ KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+ // Use public key for validation
+ String publicKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
+ props.setProperty("tokenPublicKey", publicKeyStr);
+ // Use private key to generate token
+ String privateKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPrivate());
+ PrivateKey privateKey = AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr),
+ SignatureAlgorithm.RS256);
+ String staticToken = AuthTokenUtils.createToken(privateKey, "superuser", Optional.empty());
+
+ @Cleanup
+ AuthenticationService service = new AuthenticationService(conf);
+ AuthenticationProvider provider = service.getAuthenticationProvider("token");
+
+ // First, authenticate using OIDC
+ String role = "superuser";
+ String oidcToken = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 10000L);
+ assertEquals(role, provider.authenticateAsync(new AuthenticationDataCommand(oidcToken)).get());
+
+ // Authenticate using the static token
+ assertEquals("superuser", provider.authenticateAsync(new AuthenticationDataCommand(staticToken)).get());
+
+ // Use authenticationState to authentication using OIDC
+ AuthenticationState state1 = service.getAuthenticationProvider("token").newAuthState(null, null, null);
+ assertNull(state1.authenticateAsync(AuthData.of(oidcToken.getBytes())).get());
+ assertEquals(state1.getAuthRole(), role);
+
+ // Use authenticationState to authentication using static token
+ AuthenticationState state2 = service.getAuthenticationProvider("token").newAuthState(null, null, null);
+ assertNull(state2.authenticateAsync(AuthData.of(staticToken.getBytes())).get());
+ assertEquals(state1.getAuthRole(), role);
+ }
+
@Test
void ensureRoleClaimForNonSubClaimReturnsRole() throws Exception {
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
index f8a96aa624e..16d6f9859a0 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
@@ -76,9 +77,12 @@ public class AuthenticationProviderList implements AuthenticationProvider {
private static class AuthenticationListState implements AuthenticationState {
private final List<AuthenticationState> states;
- private AuthenticationState authState;
+ private volatile AuthenticationState authState;
AuthenticationListState(List<AuthenticationState> states) {
+ if (states == null || states.isEmpty()) {
+ throw new IllegalArgumentException("Authentication state requires at least one state");
+ }
this.states = states;
this.authState = states.get(0);
}
@@ -96,6 +100,61 @@ public class AuthenticationProviderList implements AuthenticationProvider {
return getAuthState().getAuthRole();
}
+ @Override
+ public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
+ // First, attempt to authenticate with the current auth state
+ CompletableFuture<AuthData> authChallengeFuture = new CompletableFuture<>();
+ authState
+ .authenticateAsync(authData)
+ .whenComplete((authChallenge, ex) -> {
+ if (ex == null) {
+ // Current authState is still correct. Just need to return the authChallenge.
+ authChallengeFuture.complete(authChallenge);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Authentication failed for auth provider " + authState.getClass() + ": ", ex);
+ }
+ authenticateRemainingAuthStates(authChallengeFuture, authData, ex, states.size() - 1);
+ }
+ });
+ return authChallengeFuture;
+ }
+
+ private void authenticateRemainingAuthStates(CompletableFuture<AuthData> authChallengeFuture,
+ AuthData clientAuthData,
+ Throwable previousException,
+ int index) {
+ if (index < 0) {
+ if (previousException == null) {
+ previousException = new AuthenticationException("Authentication required");
+ }
+ AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
+ "authentication-provider-list", "Authentication required");
+ authChallengeFuture.completeExceptionally(previousException);
+ return;
+ }
+ AuthenticationState state = states.get(index);
+ if (state == authState) {
+ // Skip the current auth state
+ authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, null, index - 1);
+ } else {
+ state.authenticateAsync(clientAuthData)
+ .whenComplete((authChallenge, ex) -> {
+ if (ex == null) {
+ // Found the correct auth state
+ authState = state;
+ authChallengeFuture.complete(authChallenge);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Authentication failed for auth provider "
+ + authState.getClass() + ": ", ex);
+ }
+ authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, ex, index - 1);
+ }
+ });
+ }
+ }
+
@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
return applyAuthProcessor(
@@ -160,6 +219,40 @@ public class AuthenticationProviderList implements AuthenticationProvider {
return providers.get(0).getAuthMethodName();
}
+ @Override
+ public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
+ CompletableFuture<String> roleFuture = new CompletableFuture<>();
+ authenticateRemainingAuthProviders(roleFuture, authData, null, providers.size() - 1);
+ return roleFuture;
+ }
+
+ private void authenticateRemainingAuthProviders(CompletableFuture<String> roleFuture,
+ AuthenticationDataSource authData,
+ Throwable previousException,
+ int index) {
+ if (index < 0) {
+ if (previousException == null) {
+ previousException = new AuthenticationException("Authentication required");
+ }
+ AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
+ "authentication-provider-list", "Authentication required");
+ roleFuture.completeExceptionally(previousException);
+ return;
+ }
+ AuthenticationProvider provider = providers.get(index);
+ provider.authenticateAsync(authData)
+ .whenComplete((role, ex) -> {
+ if (ex == null) {
+ roleFuture.complete(role);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Authentication failed for auth provider " + provider.getClass() + ": ", ex);
+ }
+ authenticateRemainingAuthProviders(roleFuture, authData, ex, index - 1);
+ }
+ });
+ }
+
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
return applyAuthProcessor(
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
index df011412fee..7793a5c029f 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
@@ -161,6 +161,30 @@ public class AuthenticationProviderListTest {
testAuthenticate(tokenBB, SUBJECT_B);
}
+ private void testAuthenticateAsync(String token, String expectedSubject) throws Exception {
+ String actualSubject = authProvider.authenticateAsync(new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return token;
+ }
+ }).get();
+ assertEquals(actualSubject, expectedSubject);
+ }
+
+ @Test
+ public void testAuthenticateAsync() throws Exception {
+ testAuthenticateAsync(tokenAA, SUBJECT_A);
+ testAuthenticateAsync(tokenAB, SUBJECT_B);
+ testAuthenticateAsync(tokenBA, SUBJECT_A);
+ testAuthenticateAsync(tokenBB, SUBJECT_B);
+ }
+
+
private AuthenticationState newAuthState(String token, String expectedSubject) throws Exception {
// Must pass the token to the newAuthState for legacy reasons.
AuthenticationState authState = authProvider.newAuthState(