You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/02/14 19:48:08 UTC

[pulsar] branch branch-2.11 updated: [improve][broker] Require authRole is proxyRole to set originalPrincipal (#19455)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c7eabc93913 [improve][broker] Require authRole is proxyRole to set originalPrincipal (#19455)
c7eabc93913 is described below

commit c7eabc9391361508fc222469479bfea83d5d0c64
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue Feb 14 00:20:27 2023 -0600

    [improve][broker] Require authRole is proxyRole to set originalPrincipal (#19455)
    
    Co-authored-by: Lari Hotari <lh...@apache.org>
    (cherry picked from commit aa63a5567a9e5d466b311a54d5dcc2cb05c2b5cd)
---
 .../broker/authorization/AuthorizationService.java |  67 ++++++++++++++-------
 .../broker/admin/impl/PersistentTopicsBase.java    |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  31 +---------
 .../pulsar/broker/web/PulsarWebResource.java       |  27 +++------
 .../pulsar/broker/auth/AuthorizationTest.java      |  41 ++++++++++++-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   7 +++
 .../pulsar/broker/service/ServerCnxTest.java       |   4 ++
 .../client/impl/AdminApiKeyStoreTlsAuthTest.java   |  18 +++---
 .../ProxyAuthenticatedProducerConsumerTest.java    |  44 +++++++++-----
 .../server/ProxyWithAuthorizationNegTest.java      |   2 +
 .../server/ProxyWithJwtAuthorizationTest.java      |   4 +-
 tests/certificate-authority/generate_keystore.sh   |  11 ++++
 .../certificate-authority/jks/broker.keystore.jks  | Bin 2254 -> 2254 bytes
 .../jks/broker.truststore.jks                      | Bin 978 -> 969 bytes
 .../jks/broker.truststore.nopassword.jks           | Bin 978 -> 969 bytes
 .../certificate-authority/jks/client.keystore.jks  | Bin 2258 -> 2257 bytes
 .../jks/client.truststore.jks                      | Bin 980 -> 971 bytes
 .../jks/client.truststore.nopassword.jks           | Bin 980 -> 971 bytes
 .../jks/proxy-and-client.truststore.jks            | Bin 0 -> 1891 bytes
 .../jks/proxy-and-client.truststore.nopassword.jks | Bin 0 -> 1891 bytes
 tests/certificate-authority/jks/proxy.keystore.jks | Bin 0 -> 2245 bytes
 .../certificate-authority/jks/proxy.truststore.jks | Bin 0 -> 971 bytes
 .../jks/proxy.truststore.nopassword.jks            | Bin 0 -> 971 bytes
 23 files changed, 163 insertions(+), 95 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 3baaf57990a..05f146e8953 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -19,10 +19,10 @@
 package org.apache.pulsar.broker.authorization;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import java.net.SocketAddress;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -37,7 +37,6 @@ import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
@@ -293,19 +292,39 @@ public class AuthorizationService {
         return provider.allowSinkOpsAsync(namespaceName, role, authenticationData);
     }
 
-    private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
-                                                  String originalPrincipal) {
-        if (proxyRoles.contains(authenticatedPrincipal)) {
-            // Request has come from a proxy
+    public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
+                                            String originalPrincipal,
+                                            AuthenticationDataSource authDataSource) {
+        SocketAddress remoteAddress = authDataSource != null ? authDataSource.getPeerAddress() : null;
+        return isValidOriginalPrincipal(authenticatedPrincipal, originalPrincipal, remoteAddress);
+    }
+
+    /**
+     * Validates that the authenticatedPrincipal and the originalPrincipal are a valid combination.
+     * Valid combinations fulfill the following rule: the authenticatedPrincipal is in
+     * {@link ServiceConfiguration#getProxyRoles()}, if, and only if, the originalPrincipal is set to a role
+     * that is not also in {@link ServiceConfiguration#getProxyRoles()}.
+     * @return true when roles are a valid combination and false when roles are an invalid combination
+     */
+    public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
+                                            String originalPrincipal,
+                                            SocketAddress remoteAddress) {
+        String errorMsg = null;
+        if (conf.getProxyRoles().contains(authenticatedPrincipal)) {
             if (StringUtils.isBlank(originalPrincipal)) {
-                log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
-                throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the "
-                        + "request is via proxy.");
-            }
-            if (proxyRoles.contains(originalPrincipal)) {
-                log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
-                throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+                errorMsg = "originalPrincipal must be provided when connecting with a proxy role.";
+            } else if (conf.getProxyRoles().contains(originalPrincipal)) {
+                errorMsg = "originalPrincipal cannot be a proxy role.";
             }
+        } else if (StringUtils.isNotBlank(originalPrincipal)) {
+            errorMsg = "cannot specify originalPrincipal when connecting without valid proxy role.";
+        }
+        if (errorMsg != null) {
+            log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress,
+                    authenticatedPrincipal, originalPrincipal, errorMsg);
+            return false;
+        } else {
+            return true;
         }
     }
 
@@ -340,7 +359,9 @@ public class AuthorizationService {
                                                                 String originalRole,
                                                                 String role,
                                                                 AuthenticationDataSource authData) {
-        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+            return CompletableFuture.completedFuture(false);
+        }
         if (isProxyRole(role)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync(
                     tenantName, operation, role, authData);
@@ -396,7 +417,9 @@ public class AuthorizationService {
                                                                    String originalRole,
                                                                    String role,
                                                                    AuthenticationDataSource authData) {
-        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+            return CompletableFuture.completedFuture(false);
+        }
         if (isProxyRole(role)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync(
                     namespaceName, operation, role, authData);
@@ -438,7 +461,9 @@ public class AuthorizationService {
                                                                          String originalRole,
                                                                          String role,
                                                                          AuthenticationDataSource authData) {
-        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+            return CompletableFuture.completedFuture(false);
+        }
         if (isProxyRole(role)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
                     namespaceName, policy, operation, role, authData);
@@ -495,10 +520,8 @@ public class AuthorizationService {
                                                                      String originalRole,
                                                                      String role,
                                                                      AuthenticationDataSource authData) {
-        try {
-            validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
-        } catch (RestException e) {
-            return FutureUtil.failedFuture(e);
+        if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+            return CompletableFuture.completedFuture(false);
         }
         if (isProxyRole(role)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicPolicyOperationAsync(
@@ -582,7 +605,9 @@ public class AuthorizationService {
                                                                String originalRole,
                                                                String role,
                                                                AuthenticationDataSource authData) {
-        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+            return CompletableFuture.completedFuture(false);
+        }
         if (isProxyRole(role)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync(
                     topicName, operation, role, authData);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c175b22c91f..aed4c29e0ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4171,7 +4171,7 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
-    public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
+    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
             PulsarService pulsar, String clientAppId, String originalPrincipal,
             AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9cb2f9c74e0..6d0473ed99c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -196,7 +196,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     private int nonPersistentPendingMessages = 0;
     private final int maxNonPersistentPendingMessages;
     private String originalPrincipal = null;
-    private Set<String> proxyRoles;
     private final boolean schemaValidationEnforced;
     private String authMethod = "none";
     private final int maxMessageSize;
@@ -268,7 +267,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         this.recentlyClosedProducers = new HashMap<>();
         this.replicatorPrefix = conf.getReplicatorPrefix();
         this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
-        this.proxyRoles = conf.getProxyRoles();
         this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
         this.maxMessageSize = conf.getMaxMessageSize();
         this.maxPendingSendRequests = conf.getMaxPendingPublishRequestsPerConnection();
@@ -378,32 +376,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         ctx.close();
     }
 
-    /**
-     * When transitioning from Connecting to Connected, this method validates the roles.
-     * If the authRole is one of proxyRoles, the following must be true:
-     * - the originalPrincipal is given while connecting
-     * - originalPrincipal is not blank
-     * - originalPrincipal is not a proxy principal.
-     * @return true when roles are valid and false when roles are invalid
-     */
-    private boolean isValidRoleAndOriginalPrincipal() {
-        String errorMsg = null;
-        if (proxyRoles.contains(authRole)) {
-            if (StringUtils.isBlank(originalPrincipal)) {
-                errorMsg = "originalPrincipal must be provided when connecting with a proxy role.";
-            } else if (proxyRoles.contains(originalPrincipal)) {
-                errorMsg = "originalPrincipal cannot be a proxy role.";
-            }
-        }
-        if (errorMsg != null) {
-            log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress, authRole,
-                    originalPrincipal, errorMsg);
-            return false;
-        } else {
-            return true;
-        }
-    }
-
     // ////
     // // Incoming commands handling
     // ////
@@ -663,7 +635,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     // complete the connect and sent newConnected command
     private void completeConnect(int clientProtoVersion, String clientVersion, boolean supportsTopicWatchers) {
         if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
-            if (!isValidRoleAndOriginalPrincipal()) {
+            if (!service.getAuthorizationService()
+                    .isValidOriginalPrincipal(authRole, originalPrincipal, remoteAddress)) {
                 state = State.Failed;
                 service.getPulsarStats().recordConnectionCreateFail();
                 final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 910db719a97..435ff119144 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -151,19 +151,11 @@ public abstract class PulsarWebResource {
         return appId != null;
     }
 
-    private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
-                                                  String originalPrincipal) {
-        if (proxyRoles.contains(authenticatedPrincipal)) {
-            // Request has come from a proxy
-            if (StringUtils.isBlank(originalPrincipal)) {
-                log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
-                throw new RestException(Status.UNAUTHORIZED,
-                        "Original principal cannot be empty if the request is via proxy.");
-            }
-            if (proxyRoles.contains(originalPrincipal)) {
-                log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
-                throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
-            }
+    private void validateOriginalPrincipal(String authenticatedPrincipal, String originalPrincipal) {
+        if (!pulsar.getBrokerService().getAuthorizationService()
+                .isValidOriginalPrincipal(authenticatedPrincipal, originalPrincipal, clientAuthData())) {
+            throw new RestException(Status.UNAUTHORIZED,
+                    "Invalid combination of Original principal cannot be empty if the request is via proxy.");
         }
     }
 
@@ -186,7 +178,7 @@ public abstract class PulsarWebResource {
                     isClientAuthenticated(appId), appId);
         }
         String originalPrincipal = originalPrincipal();
-        validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), appId, originalPrincipal);
+        validateOriginalPrincipal(appId, originalPrincipal);
 
         if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
             BrokerService brokerService = pulsar.getBrokerService();
@@ -261,7 +253,7 @@ public abstract class PulsarWebResource {
         }
     }
 
-    protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
+    protected void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
                                                 String originalPrincipal, String tenant,
                                                 AuthenticationDataSource authenticationData,
                                                 long timeout, TimeUnit unit) {
@@ -288,7 +280,7 @@ public abstract class PulsarWebResource {
                 clientAuthData());
     }
 
-    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(
             PulsarService pulsar, String clientAppId,
             String originalPrincipal, String tenant,
             AuthenticationDataSource authenticationData) {
@@ -307,8 +299,7 @@ public abstract class PulsarWebResource {
                         if (!isClientAuthenticated(clientAppId)) {
                             throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
                         }
-                        validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
-                                originalPrincipal);
+                        validateOriginalPrincipal(clientAppId, originalPrincipal);
                         if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
                             AuthorizationService authorizationService =
                                     pulsar.getBrokerService().getAuthorizationService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 39a91f72dc7..ad69180b236 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -22,8 +22,14 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
+import java.net.SocketAddress;
+import java.util.Collections;
 import java.util.EnumSet;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -32,11 +38,11 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import com.google.common.collect.Sets;
 
 @Test(groups = "flaky")
 public class AuthorizationTest extends MockedPulsarServiceBaseTest {
@@ -229,6 +235,39 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
         admin.clusters().deleteCluster("c1");
     }
 
+    @Test
+    public void testOriginalRoleValidation() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProxyRoles(Collections.singleton("proxy"));
+        AuthorizationService auth = new AuthorizationService(conf, Mockito.mock(PulsarResources.class));
+
+        // Original principal should be supplied when authenticatedPrincipal is proxy role
+        assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (SocketAddress) null));
+
+        // Non proxy role should not supply originalPrincipal
+        assertTrue(auth.isValidOriginalPrincipal("client", "", (SocketAddress) null));
+        assertTrue(auth.isValidOriginalPrincipal("client", null, (SocketAddress) null));
+
+        // Only likely in cases when authentication is disabled, but we still define these to be valid.
+        assertTrue(auth.isValidOriginalPrincipal(null, null, (SocketAddress) null));
+        assertTrue(auth.isValidOriginalPrincipal(null, "", (SocketAddress) null));
+        assertTrue(auth.isValidOriginalPrincipal("", null, (SocketAddress) null));
+        assertTrue(auth.isValidOriginalPrincipal("", "", (SocketAddress) null));
+
+        // Proxy role must supply an original principal
+        assertFalse(auth.isValidOriginalPrincipal("proxy", "", (SocketAddress) null));
+        assertFalse(auth.isValidOriginalPrincipal("proxy", null, (SocketAddress) null));
+
+        // OriginalPrincipal cannot be proxy role
+        assertFalse(auth.isValidOriginalPrincipal("proxy", "proxy", (SocketAddress) null));
+        assertFalse(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null));
+        assertFalse(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null));
+        assertFalse(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null));
+
+        // Must gracefully handle a missing AuthenticationDataSource
+        assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (AuthenticationDataSource) null));
+    }
+
     @Test
     public void testGetListWithGetBundleOp() throws Exception {
         String tenant = "p1";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 3b483efc33d..c0e78b8e32b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -105,6 +105,13 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
     public final static String CLIENT_KEYSTORE_PW = "111111";
     public final static String CLIENT_TRUSTSTORE_PW = "111111";
 
+    public final static String PROXY_KEYSTORE_FILE_PATH =
+            ResourceUtils.getAbsolutePath("certificate-authority/jks/proxy.keystore.jks");
+    public final static String PROXY_KEYSTORE_PW = "111111";
+    public final static String PROXY_AND_CLIENT_TRUSTSTORE_FILE_PATH =
+            ResourceUtils.getAbsolutePath("certificate-authority/jks/proxy-and-client.truststore.jks");
+    public final static String PROXY_AND_CLIENT_TRUSTSTORE_PW = "111111";
+
     public final static String CLIENT_KEYSTORE_CN = "clientuser";
     public final static String KEYSTORE_TYPE = "JKS";
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ebfc09a3ab2..b181d6f00f8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -452,6 +452,10 @@ public class ServerCnxTest {
         verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "pass.proxy");
         verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "");
         verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", null);
+        // Invalid combinations where original principal is set to a pass.proxy role
+        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.proxy");
+        // Invalid combinations where the original principal is set to a non-proxy role
+        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client1", "pass.client");
     }
 
     private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, String authData,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
index 22400e0ae64..67a1e56737c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.tls.NoopHostnameVerifier;
 import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientProperties;
@@ -84,8 +83,8 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
         conf.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
 
         conf.setTlsTrustStoreType(KEYSTORE_TYPE);
-        conf.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
-        conf.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+        conf.setTlsTrustStore(PROXY_AND_CLIENT_TRUSTSTORE_FILE_PATH);
+        conf.setTlsTrustStorePassword(PROXY_AND_CLIENT_TRUSTSTORE_PW);
 
         conf.setClusterName(clusterName);
         conf.setTlsRequireTrustedClientCertOnConnect(true);
@@ -95,6 +94,7 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
 
         // config for authentication and authorization.
         conf.setSuperUserRoles(Sets.newHashSet(CLIENT_KEYSTORE_CN));
+        conf.setProxyRoles(Sets.newHashSet("proxy"));
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
         Set<String> providers = new HashSet<>();
@@ -140,13 +140,13 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
 
         SSLContext sslCtx = KeyStoreSSLContext.createClientSslContext(
                 KEYSTORE_TYPE,
-                CLIENT_KEYSTORE_FILE_PATH,
-                CLIENT_KEYSTORE_PW,
+                PROXY_KEYSTORE_FILE_PATH,
+                PROXY_KEYSTORE_PW,
                 KEYSTORE_TYPE,
                 BROKER_TRUSTSTORE_FILE_PATH,
                 BROKER_TRUSTSTORE_PW);
 
-        clientBuilder.sslContext(sslCtx).hostnameVerifier(NoopHostnameVerifier.INSTANCE);
+        clientBuilder.sslContext(sslCtx);
         Client client = clientBuilder.build();
 
         return client.target(brokerUrlTls.toString());
@@ -179,12 +179,12 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
     }
 
     @Test
-    public void testSuperUserCantListNamespaces() throws Exception {
+    public void testSuperUserCanListNamespaces() throws Exception {
         try (PulsarAdmin admin = buildAdminClient()) {
             admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
             admin.tenants().createTenant("tenant1",
-                                         new TenantInfoImpl(ImmutableSet.of("proxy"),
-                                                        ImmutableSet.of("test")));
+                                         new TenantInfoImpl(ImmutableSet.of(""),
+                                                 ImmutableSet.of("test")));
             admin.namespaces().createNamespace("tenant1/ns1");
             Assert.assertTrue(admin.namespaces().getNamespaces("tenant1").contains("tenant1/ns1"));
         }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 622cc7dc35f..b6985400c81 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.spy;
 
 import com.google.common.collect.Sets;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
@@ -57,9 +58,17 @@ import org.testng.collections.Maps;
 public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticatedProducerConsumerTest.class);
 
+    // Root for both proxy and client certificates
     private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
-    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
-    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
+
+    // Borrow certs for broker and proxy from other test
+    private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cert.pem";
+    private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-key.pem";
+    private final String TLS_BROKER_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/broker-cacert.pem";
+    private final String TLS_BROKER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/broker-cert.pem";
+    private final String TLS_BROKER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/broker-key.pem";
+
+    // This client cert is a superUser, so use that one
     private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
     private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
 
@@ -78,20 +87,23 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
         conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
-        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        conf.setTlsAllowInsecureConnection(true);
+        conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH);
+        conf.setTlsAllowInsecureConnection(false);
         conf.setNumExecutorThreadPoolSize(5);
 
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("localhost");
         superUserRoles.add("superUser");
+        superUserRoles.add("Proxy");
         conf.setSuperUserRoles(superUserRoles);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
 
         conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         conf.setBrokerClientAuthenticationParameters(
-                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
-        conf.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+        conf.setBrokerClientTrustCertsFilePath(TLS_BROKER_TRUST_CERT_FILE_PATH);
+        conf.setBrokerClientTlsEnabled(true);
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
@@ -102,7 +114,6 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
 
         // start proxy service
         proxyConfig.setAuthenticationEnabled(true);
-        proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
@@ -110,16 +121,18 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
         proxyConfig.setTlsEnabledWithBroker(true);
+        // Setting advertised address to localhost to avoid hostname verification failure
+        proxyConfig.setAdvertisedAddress("localhost");
 
         // enable tls and auth&auth at proxy
-        proxyConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        proxyConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+        proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
         proxyConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
 
         proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         proxyConfig.setBrokerClientAuthenticationParameters(
-                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
-        proxyConfig.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+                "tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setBrokerClientTrustCertsFilePath(TLS_BROKER_TRUST_CERT_FILE_PATH);
         proxyConfig.setAuthenticationProviders(providers);
 
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
@@ -207,10 +220,11 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
     }
 
     protected final PulsarClient createPulsarClient(Authentication auth, String lookupUrl) throws Exception {
-        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString()).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
-                .allowTlsInsecureConnection(true).authentication(auth).build());
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
+                .tlsTrustCertsFilePath(TLS_BROKER_TRUST_CERT_FILE_PATH)
+                .enableTlsHostnameVerification(true).authentication(auth).build());
         return PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
-                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true).authentication(auth)
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTlsHostnameVerification(true).authentication(auth)
                 .enableTls(true).build();
 
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 0fad961ba21..23138929c84 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.spy;
 
 import com.google.common.collect.Sets;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
@@ -91,6 +92,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("superUser");
         conf.setSuperUserRoles(superUserRoles);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
 
         conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         conf.setBrokerClientAuthenticationParameters(
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 206b90a6909..9dc08d84848 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.SignatureAlgorithm;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
@@ -56,7 +57,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
-    private static final Logger log = LoggerFactory.getLogger(ProxyWithAuthorizationTest.class);
+    private static final Logger log = LoggerFactory.getLogger(ProxyWithJwtAuthorizationTest.class);
 
     private final String ADMIN_ROLE = "admin";
     private final String PROXY_ROLE = "proxy";
@@ -85,6 +86,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
         superUserRoles.add(PROXY_ROLE);
         superUserRoles.add(BROKER_ROLE);
         conf.setSuperUserRoles(superUserRoles);
+        conf.setProxyRoles(Collections.singleton(PROXY_ROLE));
 
         conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
         conf.setBrokerClientAuthenticationParameters(BROKER_TOKEN);
diff --git a/tests/certificate-authority/generate_keystore.sh b/tests/certificate-authority/generate_keystore.sh
index cbddd53c456..faf808324b0 100755
--- a/tests/certificate-authority/generate_keystore.sh
+++ b/tests/certificate-authority/generate_keystore.sh
@@ -31,19 +31,30 @@ keytool -genkeypair -keystore broker.keystore.jks $COMMON_PARAMS -keyalg RSA -ke
   -dname 'CN=localhost,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown'
 keytool -genkeypair -keystore client.keystore.jks $COMMON_PARAMS -keyalg RSA -keysize 2048 -alias client -validity $DAYS \
   -dname 'CN=clientuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown'
+keytool -genkeypair -keystore proxy.keystore.jks $COMMON_PARAMS -keyalg RSA -keysize 2048 -alias proxy -validity $DAYS \
+  -dname 'CN=proxy,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown'
 
 # export certificate
 keytool -exportcert -keystore broker.keystore.jks $COMMON_PARAMS -file broker.cer -alias broker
 keytool -exportcert -keystore client.keystore.jks $COMMON_PARAMS -file client.cer -alias client
+keytool -exportcert -keystore proxy.keystore.jks $COMMON_PARAMS -file proxy.cer -alias proxy
 
 # generate truststore
 keytool -importcert -keystore client.truststore.jks $COMMON_PARAMS -file client.cer -alias truststore
 keytool -importcert -keystore broker.truststore.jks $COMMON_PARAMS -file broker.cer -alias truststore
+keytool -importcert -keystore proxy.truststore.jks $COMMON_PARAMS -file proxy.cer -file client.cer -alias truststore
+
+# generate trust store with proxy and client public certs
+keytool -importcert -keystore proxy-and-client.truststore.jks $COMMON_PARAMS -file proxy.cer -alias proxy
+keytool -importcert -keystore proxy-and-client.truststore.jks $COMMON_PARAMS -file client.cer -alias client
 
 # generate a truststore without password
 java ../RemoveJksPassword.java client.truststore.jks 111111 client.truststore.nopassword.jks
 java ../RemoveJksPassword.java broker.truststore.jks 111111 broker.truststore.nopassword.jks
+java ../RemoveJksPassword.java proxy.truststore.jks 111111 proxy.truststore.nopassword.jks
+java ../RemoveJksPassword.java proxy-and-client.truststore.jks 111111 proxy-and-client.truststore.nopassword.jks
 
 # cleanup
 rm broker.cer
 rm client.cer
+rm proxy.cer
diff --git a/tests/certificate-authority/jks/broker.keystore.jks b/tests/certificate-authority/jks/broker.keystore.jks
index 3495891ce9b..6f2df055f26 100644
Binary files a/tests/certificate-authority/jks/broker.keystore.jks and b/tests/certificate-authority/jks/broker.keystore.jks differ
diff --git a/tests/certificate-authority/jks/broker.truststore.jks b/tests/certificate-authority/jks/broker.truststore.jks
index 351b22b5af6..9c35356c540 100644
Binary files a/tests/certificate-authority/jks/broker.truststore.jks and b/tests/certificate-authority/jks/broker.truststore.jks differ
diff --git a/tests/certificate-authority/jks/broker.truststore.nopassword.jks b/tests/certificate-authority/jks/broker.truststore.nopassword.jks
index e4d6dff0047..75c3fd8012f 100644
Binary files a/tests/certificate-authority/jks/broker.truststore.nopassword.jks and b/tests/certificate-authority/jks/broker.truststore.nopassword.jks differ
diff --git a/tests/certificate-authority/jks/client.keystore.jks b/tests/certificate-authority/jks/client.keystore.jks
index 40c6cf4ff2f..0c9d33408e1 100644
Binary files a/tests/certificate-authority/jks/client.keystore.jks and b/tests/certificate-authority/jks/client.keystore.jks differ
diff --git a/tests/certificate-authority/jks/client.truststore.jks b/tests/certificate-authority/jks/client.truststore.jks
index d868d77d5db..ac59bd92541 100644
Binary files a/tests/certificate-authority/jks/client.truststore.jks and b/tests/certificate-authority/jks/client.truststore.jks differ
diff --git a/tests/certificate-authority/jks/client.truststore.nopassword.jks b/tests/certificate-authority/jks/client.truststore.nopassword.jks
index 22501ae5ec1..363fafab6be 100644
Binary files a/tests/certificate-authority/jks/client.truststore.nopassword.jks and b/tests/certificate-authority/jks/client.truststore.nopassword.jks differ
diff --git a/tests/certificate-authority/jks/proxy-and-client.truststore.jks b/tests/certificate-authority/jks/proxy-and-client.truststore.jks
new file mode 100644
index 00000000000..45a49018d8d
Binary files /dev/null and b/tests/certificate-authority/jks/proxy-and-client.truststore.jks differ
diff --git a/tests/certificate-authority/jks/proxy-and-client.truststore.nopassword.jks b/tests/certificate-authority/jks/proxy-and-client.truststore.nopassword.jks
new file mode 100644
index 00000000000..bd91f17e5b2
Binary files /dev/null and b/tests/certificate-authority/jks/proxy-and-client.truststore.nopassword.jks differ
diff --git a/tests/certificate-authority/jks/proxy.keystore.jks b/tests/certificate-authority/jks/proxy.keystore.jks
new file mode 100644
index 00000000000..d3b977560f1
Binary files /dev/null and b/tests/certificate-authority/jks/proxy.keystore.jks differ
diff --git a/tests/certificate-authority/jks/proxy.truststore.jks b/tests/certificate-authority/jks/proxy.truststore.jks
new file mode 100644
index 00000000000..0e13895b1c2
Binary files /dev/null and b/tests/certificate-authority/jks/proxy.truststore.jks differ
diff --git a/tests/certificate-authority/jks/proxy.truststore.nopassword.jks b/tests/certificate-authority/jks/proxy.truststore.nopassword.jks
new file mode 100644
index 00000000000..2a2729f1c3f
Binary files /dev/null and b/tests/certificate-authority/jks/proxy.truststore.nopassword.jks differ