You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/02/05 11:32:58 UTC

[pulsar] branch master updated: [improve][broker] Add test to verify authRole cannot change (#19430)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa7af1099e7 [improve][broker] Add test to verify authRole cannot change (#19430)
aa7af1099e7 is described below

commit aa7af1099e72e328be1e7f0c332060b69e6aa0d7
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sun Feb 5 05:32:46 2023 -0600

    [improve][broker] Add test to verify authRole cannot change (#19430)
---
 .../MockAlwaysExpiredAuthenticationProvider.java   |  46 +++++++++
 .../auth/MockAlwaysExpiredAuthenticationState.java |  86 +++++++++++++++++
 .../pulsar/broker/service/ServerCnxTest.java       | 107 +++++++++++++++++++++
 3 files changed, 239 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationProvider.java
new file mode 100644
index 00000000000..d9442bf8c6e
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import java.net.SocketAddress;
+
+/**
+ * Class that provides the same authentication semantics as the {@link MockAuthenticationProvider} except
+ * that this one initializes the {@link MockAlwaysExpiredAuthenticationState} class to support testing
+ * expired authentication and auth refresh.
+ */
+public class MockAlwaysExpiredAuthenticationProvider extends MockAuthenticationProvider {
+
+    @Override
+    public String getAuthMethodName() {
+        return "always-expired";
+    }
+
+    @Override
+    public AuthenticationState newAuthState(AuthData authData,
+                                            SocketAddress remoteAddress,
+                                            SSLSession sslSession) throws AuthenticationException {
+        return new MockAlwaysExpiredAuthenticationState(this);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java
new file mode 100644
index 00000000000..95751ed0b85
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+
+import javax.naming.AuthenticationException;
+import java.util.concurrent.CompletableFuture;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Class to use when verifying the behavior around expired authentication data because it will always return
+ * true when isExpired is called.
+ */
+public class MockAlwaysExpiredAuthenticationState implements AuthenticationState {
+    final MockAlwaysExpiredAuthenticationProvider provider;
+    AuthenticationDataSource authenticationDataSource;
+    volatile String authRole;
+
+    MockAlwaysExpiredAuthenticationState(MockAlwaysExpiredAuthenticationProvider provider) {
+        this.provider = provider;
+    }
+
+
+    @Override
+    public String getAuthRole() throws AuthenticationException {
+        if (authRole == null) {
+            throw new AuthenticationException("Must authenticate first.");
+        }
+        return authRole;
+    }
+
+    @Override
+    public AuthData authenticate(AuthData authData) throws AuthenticationException {
+        return null;
+    }
+
+    /**
+     * This authentication is always single stage, so it returns immediately
+     */
+    @Override
+    public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
+        authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
+        return provider
+                .authenticateAsync(authenticationDataSource)
+                .thenApply(role -> {
+                    authRole = role;
+                    return null;
+                });
+    }
+
+    @Override
+    public AuthenticationDataSource getAuthDataSource() {
+        return authenticationDataSource;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return true;
+    }
+
+    @Override
+    public boolean isExpired() {
+        return true;
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5b45a16d3dc..bd4adef2cd1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -74,6 +74,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
@@ -476,6 +477,112 @@ public class ServerCnxTest {
         channel.finish();
     }
 
+    public void testAuthChallengePrincipalChangeFails() throws Exception {
+        AuthenticationService authenticationService = mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", "");
+        channel.writeInbound(clientCommand);
+
+        Object responseConnected = getResponse();
+        assertTrue(responseConnected instanceof CommandConnected);
+        assertEquals(serverCnx.getState(), State.Connected);
+        assertEquals(serverCnx.getPrincipal(), "pass.client");
+        assertTrue(serverCnx.isActive());
+
+        // Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation)
+        // and then force channel to run the task
+        serverCnx.refreshAuthenticationCredentials();
+        channel.runPendingTasks();
+        Object responseAuthChallenge1 = getResponse();
+        assertTrue(responseAuthChallenge1 instanceof CommandAuthChallenge);
+
+        // Respond with valid info that will both pass and be the same
+        ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client".getBytes()), 1, "");
+        channel.writeInbound(authResponse1);
+
+        // Trigger the ServerCnx to check if authentication is expired again
+        serverCnx.refreshAuthenticationCredentials();
+        assertTrue(channel.hasPendingTasks(), "This test assumes there are pending tasks to run.");
+        channel.runPendingTasks();
+        Object responseAuthChallenge2 = getResponse();
+        assertTrue(responseAuthChallenge2 instanceof CommandAuthChallenge);
+
+        // Respond with invalid info that will pass but have a different authRole
+        ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client2".getBytes()), 1, "");
+        channel.writeInbound(authResponse2);
+
+        // Expect the connection to disconnect
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
+
+        channel.finish();
+    }
+
+    public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception {
+        AuthenticationService authenticationService = mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(true);
+        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
+                null, "pass.client", "pass.client", authMethodName);
+        channel.writeInbound(clientCommand);
+
+        Object responseConnected = getResponse();
+        assertTrue(responseConnected instanceof CommandConnected);
+        assertEquals(serverCnx.getState(), State.Connected);
+        assertEquals(serverCnx.getAuthRole(), "pass.proxy");
+        // These are all taken without verifying the auth data
+        assertEquals(serverCnx.getPrincipal(), "pass.client");
+        assertEquals(serverCnx.getOriginalPrincipal(), "pass.client");
+        assertTrue(serverCnx.isActive());
+
+        // Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation)
+        // and then force channel to run the task
+        serverCnx.refreshAuthenticationCredentials();
+        assertTrue(channel.hasPendingTasks(), "This test assumes there are pending tasks to run.");
+        channel.runPendingTasks();
+        Object responseAuthChallenge1 = getResponse();
+        assertTrue(responseAuthChallenge1 instanceof CommandAuthChallenge);
+
+        // Respond with valid info that will both pass and be the same
+        ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client".getBytes()), 1, "");
+        channel.writeInbound(authResponse1);
+
+        // Trigger the ServerCnx to check if authentication is expired again
+        serverCnx.refreshAuthenticationCredentials();
+        channel.runPendingTasks();
+        Object responseAuthChallenge2 = getResponse();
+        assertTrue(responseAuthChallenge2 instanceof CommandAuthChallenge);
+
+        // Respond with invalid info that will pass but have a different authRole
+        ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client2".getBytes()), 1, "");
+        channel.writeInbound(authResponse2);
+
+        // Expect the connection to disconnect
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
+
+        channel.finish();
+    }
+
     @Test(timeOut = 30000)
     public void testConnectCommandWithAuthenticationNegative() throws Exception {
         AuthenticationService authenticationService = mock(AuthenticationService.class);