You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/04/19 01:27:47 UTC

[pulsar] branch branch-3.0 updated: [fix][broker] Implement authenticateAsync for AuthenticationProviderList (#20132)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 370f6d7af78 [fix][broker] Implement authenticateAsync for AuthenticationProviderList (#20132)
370f6d7af78 is described below

commit 370f6d7af78fa7bd8f92f8116fac4e3cf32adecb
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue Apr 18 20:24:58 2023 -0500

    [fix][broker] Implement authenticateAsync for AuthenticationProviderList (#20132)
    
    PIP: #12105 and #19771
    
    ### Motivation
    
    With the implementation of asynchronous authentication in PIP 97, I missed a case in the `AuthenticationProviderList` where we need to implement the `authenticateAsync` methods. This PR is necessary for making the `AuthenticationProviderToken` and the `AuthenticationProviderOpenID` work together, which is necessary for anyone transitioning to `AuthenticationProviderOpenID`.
    
    ### Modifications
    
    * Implement `AuthenticationListState#authenticateAsync` using a recursive algorithm that first attempts to authenticate the client using the current `authState` and then tries the remaining options.
    * Implement `AuthenticationProviderList#authenticateAsync` using a recursive algorithm that attempts each provider sequentially.
    * Add test to `AuthenticationProviderListTest` that exercises this method. It didn't technically fail previously, but it's worth adding.
    * Add test to `AuthenticationProviderOpenIDIntegrationTest` to cover the exact failures that were causing problems.
    
    (cherry picked from commit 58ccf020fc0fa5f7d9b70032ae60c8bf9a3f234f)
---
 ...uthenticationProviderOpenIDIntegrationTest.java | 57 +++++++++++++
 .../authentication/AuthenticationProviderList.java | 95 +++++++++++++++++++++-
 .../AuthenticationProviderListTest.java            | 24 ++++++
 3 files changed, 175 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
index 298492652c0..0075d70f599 100644
--- a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
+++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.fail;
 import com.github.tomakehurst.wiremock.WireMockServer;
 import io.jsonwebtoken.SignatureAlgorithm;
 import io.jsonwebtoken.impl.DefaultJwtBuilder;
+import io.jsonwebtoken.io.Decoders;
 import io.jsonwebtoken.security.Keys;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -41,13 +42,19 @@ import java.security.interfaces.RSAPublicKey;
 import java.util.Base64;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import javax.naming.AuthenticationException;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.common.api.AuthData;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -438,6 +445,56 @@ public class AuthenticationProviderOpenIDIntegrationTest {
         assertTrue(state.isExpired());
     }
 
+    /**
+     * This test covers the migration scenario where you have both the Token and OpenID providers. It ensures
+     * both kinds of authentication work.
+     * @throws Exception
+     */
+    @Test
+    public void testAuthenticationProviderListStateSuccess() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName(),
+                AuthenticationProviderToken.class.getName()));
+        Properties props = conf.getProperties();
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
+
+        // Set up static token
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+        // Use public key for validation
+        String publicKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
+        props.setProperty("tokenPublicKey", publicKeyStr);
+        // Use private key to generate token
+        String privateKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPrivate());
+        PrivateKey privateKey = AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr),
+                SignatureAlgorithm.RS256);
+        String staticToken = AuthTokenUtils.createToken(privateKey, "superuser", Optional.empty());
+
+        @Cleanup
+        AuthenticationService service = new AuthenticationService(conf);
+        AuthenticationProvider provider = service.getAuthenticationProvider("token");
+
+        // First, authenticate using OIDC
+        String role = "superuser";
+        String oidcToken = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 10000L);
+        assertEquals(role, provider.authenticateAsync(new AuthenticationDataCommand(oidcToken)).get());
+
+        // Authenticate using the static token
+        assertEquals("superuser", provider.authenticateAsync(new AuthenticationDataCommand(staticToken)).get());
+
+        // Use authenticationState to authentication using OIDC
+        AuthenticationState state1 = service.getAuthenticationProvider("token").newAuthState(null, null, null);
+        assertNull(state1.authenticateAsync(AuthData.of(oidcToken.getBytes())).get());
+        assertEquals(state1.getAuthRole(), role);
+
+        // Use authenticationState to authentication using static token
+        AuthenticationState state2 = service.getAuthenticationProvider("token").newAuthState(null, null, null);
+        assertNull(state2.authenticateAsync(AuthData.of(staticToken.getBytes())).get());
+        assertEquals(state1.getAuthRole(), role);
+    }
+
     @Test
     void ensureRoleClaimForNonSubClaimReturnsRole() throws Exception {
         AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
index f8a96aa624e..16d6f9859a0 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import javax.servlet.http.HttpServletRequest;
@@ -76,9 +77,12 @@ public class AuthenticationProviderList implements AuthenticationProvider {
     private static class AuthenticationListState implements AuthenticationState {
 
         private final List<AuthenticationState> states;
-        private AuthenticationState authState;
+        private volatile AuthenticationState authState;
 
         AuthenticationListState(List<AuthenticationState> states) {
+            if (states == null || states.isEmpty()) {
+                throw new IllegalArgumentException("Authentication state requires at least one state");
+            }
             this.states = states;
             this.authState = states.get(0);
         }
@@ -96,6 +100,61 @@ public class AuthenticationProviderList implements AuthenticationProvider {
             return getAuthState().getAuthRole();
         }
 
+        @Override
+        public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
+            // First, attempt to authenticate with the current auth state
+            CompletableFuture<AuthData> authChallengeFuture = new CompletableFuture<>();
+            authState
+                    .authenticateAsync(authData)
+                    .whenComplete((authChallenge, ex) -> {
+                        if (ex == null) {
+                            // Current authState is still correct. Just need to return the authChallenge.
+                            authChallengeFuture.complete(authChallenge);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Authentication failed for auth provider " + authState.getClass() + ": ", ex);
+                            }
+                            authenticateRemainingAuthStates(authChallengeFuture, authData, ex, states.size() - 1);
+                        }
+                    });
+            return authChallengeFuture;
+        }
+
+        private void authenticateRemainingAuthStates(CompletableFuture<AuthData> authChallengeFuture,
+                                                     AuthData clientAuthData,
+                                                     Throwable previousException,
+                                                     int index) {
+            if (index < 0) {
+                if (previousException == null) {
+                    previousException = new AuthenticationException("Authentication required");
+                }
+                AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
+                        "authentication-provider-list", "Authentication required");
+                authChallengeFuture.completeExceptionally(previousException);
+                return;
+            }
+            AuthenticationState state = states.get(index);
+            if (state == authState) {
+                // Skip the current auth state
+                authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, null, index - 1);
+            } else {
+                state.authenticateAsync(clientAuthData)
+                        .whenComplete((authChallenge, ex) -> {
+                            if (ex == null) {
+                                // Found the correct auth state
+                                authState = state;
+                                authChallengeFuture.complete(authChallenge);
+                            } else {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Authentication failed for auth provider "
+                                            + authState.getClass() + ": ", ex);
+                                }
+                                authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, ex, index - 1);
+                            }
+                        });
+            }
+        }
+
         @Override
         public AuthData authenticate(AuthData authData) throws AuthenticationException {
             return applyAuthProcessor(
@@ -160,6 +219,40 @@ public class AuthenticationProviderList implements AuthenticationProvider {
         return providers.get(0).getAuthMethodName();
     }
 
+    @Override
+    public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
+        CompletableFuture<String> roleFuture = new CompletableFuture<>();
+        authenticateRemainingAuthProviders(roleFuture, authData, null, providers.size() - 1);
+        return roleFuture;
+    }
+
+    private void authenticateRemainingAuthProviders(CompletableFuture<String> roleFuture,
+                                                    AuthenticationDataSource authData,
+                                                    Throwable previousException,
+                                                    int index) {
+        if (index < 0) {
+            if (previousException == null) {
+                previousException = new AuthenticationException("Authentication required");
+            }
+            AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
+                    "authentication-provider-list", "Authentication required");
+            roleFuture.completeExceptionally(previousException);
+            return;
+        }
+        AuthenticationProvider provider = providers.get(index);
+        provider.authenticateAsync(authData)
+                .whenComplete((role, ex) -> {
+                    if (ex == null) {
+                        roleFuture.complete(role);
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Authentication failed for auth provider " + provider.getClass() + ": ", ex);
+                        }
+                        authenticateRemainingAuthProviders(roleFuture, authData, ex, index - 1);
+                    }
+                });
+        }
+
     @Override
     public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
         return applyAuthProcessor(
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
index df011412fee..7793a5c029f 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
@@ -161,6 +161,30 @@ public class AuthenticationProviderListTest {
         testAuthenticate(tokenBB, SUBJECT_B);
     }
 
+    private void testAuthenticateAsync(String token, String expectedSubject) throws Exception {
+        String actualSubject = authProvider.authenticateAsync(new AuthenticationDataSource() {
+            @Override
+            public boolean hasDataFromCommand() {
+                return true;
+            }
+
+            @Override
+            public String getCommandData() {
+                return token;
+            }
+        }).get();
+        assertEquals(actualSubject, expectedSubject);
+    }
+
+    @Test
+    public void testAuthenticateAsync() throws Exception {
+        testAuthenticateAsync(tokenAA, SUBJECT_A);
+        testAuthenticateAsync(tokenAB, SUBJECT_B);
+        testAuthenticateAsync(tokenBA, SUBJECT_A);
+        testAuthenticateAsync(tokenBB, SUBJECT_B);
+    }
+
+
     private AuthenticationState newAuthState(String token, String expectedSubject) throws Exception {
         // Must pass the token to the newAuthState for legacy reasons.
         AuthenticationState authState = authProvider.newAuthState(