You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/13 09:32:46 UTC

[pulsar] branch branch-2.6 updated: allowTopicOperationAsync should check the original role is super user (#1355) (#7788)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 2840d6f  allowTopicOperationAsync should check the original role is super user (#1355) (#7788)
2840d6f is described below

commit 2840d6f96be6ae7fc472fd821b820bbcf1d9868d
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Aug 12 06:02:51 2020 -0700

    allowTopicOperationAsync should check the original role is super user (#1355) (#7788)
    
    * Fix allowTopicOperationAsync logic (#1355)
    
    *Modifications*
    
    - We should use the original role to verify if it is allowed for a given topic operation
    - use the original authentication data
    - Authz provider doesn't have to be aware of proxyRole
    - Fix authorization test
    
    * Refactor authorize logic to provide a uniform authorization behavior
    
    (cherry picked from commit 48f5a2f62c148b3df617be060fefed51f3145979)
---
 .../authorization/AuthorizationProvider.java       | 230 +++++++++++++++++----
 .../broker/authorization/AuthorizationService.java | 217 +++++++++++++------
 .../authorization/PulsarAuthorizationProvider.java | 103 ++++-----
 .../pulsar/broker/web/AuthenticationFilter.java    |   2 -
 .../broker/admin/impl/PersistentTopicsBase.java    |  30 +++
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../org/apache/pulsar/broker/service/Producer.java |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    | 229 ++++++++++----------
 .../pulsar/broker/web/PulsarWebResource.java       |  66 +++---
 .../pulsar/broker/service/ServerCnxTest.java       |  11 +-
 .../api/AuthorizationProducerConsumerTest.java     |  47 ++---
 .../pulsar/proxy/server/AdminProxyHandler.java     |   6 +-
 12 files changed, 598 insertions(+), 347 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 57147e7..0403f34 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -225,12 +226,19 @@ public interface AuthorizationProvider extends Closeable {
      * @param authData
      * @return CompletableFuture<Boolean>
      */
+    @Deprecated
     default CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
                                                             TenantOperation operation,
                                                             AuthenticationDataSource authData) {
-        return isTenantAdmin(tenantName, role, null, authData);
+        return allowTenantOperationAsync(
+            tenantName,
+            StringUtils.isBlank(originalRole) ? role : originalRole,
+            operation,
+            authData
+        );
     }
 
+    @Deprecated
     default Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation,
                                       AuthenticationDataSource authData) {
         try {
@@ -243,26 +251,93 @@ public interface AuthorizationProvider extends Closeable {
     }
 
     /**
+     * Check if a given <tt>role</tt> is allowed to execute a given <tt>operation</tt> on the tenant.
+     *
+     * @param tenantName tenant name
+     * @param role role name
+     * @param operation tenant operation
+     * @param authData authenticated data of the role
+     * @return a completable future represents check result
+     */
+    default CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role,
+                                                                 TenantOperation operation,
+                                                                 AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(new IllegalStateException(
+            String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" +
+                    " provider you are using.",
+                operation.toString(), tenantName)));
+    }
+
+    default Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation,
+                                         AuthenticationDataSource authData) {
+        try {
+            return allowTenantOperationAsync(tenantName, role, operation, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
+     * Check if a given <tt>role</tt> is allowed to execute a given <tt>operation</tt> on the namespace.
+     *
+     * @param namespaceName namespace name
+     * @param role role name
+     * @param operation namespace operation
+     * @param authData authenticated data
+     * @return a completable future represents check result
+     */
+    default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                    String role,
+                                                                    NamespaceOperation operation,
+                                                                    AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+            new IllegalStateException("NamespaceOperation is not supported by the Authorization provider you are using."));
+    }
+
+    default Boolean allowNamespaceOperation(NamespaceName namespaceName,
+                                            String role,
+                                            NamespaceOperation operation,
+                                            AuthenticationDataSource authData) {
+        try {
+            return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
      * Grant authorization-action permission on a namespace to the given client
+     *
      * @param namespaceName
-     * @param originalRole role not overriden by proxy role if request do pass through proxy
-     * @param role originalRole | proxyRole if the request didn't pass through proxy
+     * @param role
      * @param operation
      * @param authData
      * @return CompletableFuture<Boolean>
      */
-    default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
-                                                                 String role, NamespaceOperation operation,
-                                                                 AuthenticationDataSource authData) {
-        return FutureUtil.failedFuture(
-            new IllegalStateException(
-                    String.format("NamespaceOperation(%s) on namespace(%s) by role(%s) is not supported" +
-                    " by the Authorization provider you are using.",
-                            operation.toString(), namespaceName.toString(), role == null ? "null" : role)));
+    @Deprecated
+    default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                    String originalRole,
+                                                                    String role,
+                                                                    NamespaceOperation operation,
+                                                                    AuthenticationDataSource authData) {
+        return allowNamespaceOperationAsync(
+            namespaceName,
+            StringUtils.isBlank(originalRole) ? role : originalRole,
+            operation,
+            authData
+        );
     }
 
-    default Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role,
-                                         NamespaceOperation operation, AuthenticationDataSource authData) {
+    @Deprecated
+    default Boolean allowNamespaceOperation(NamespaceName namespaceName,
+                                            String originalRole,
+                                            String role,
+                                            NamespaceOperation operation,
+                                            AuthenticationDataSource authData) {
         try {
             return allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData).get();
         } catch (InterruptedException e) {
@@ -273,6 +348,39 @@ public interface AuthorizationProvider extends Closeable {
     }
 
     /**
+     * Check if a given <tt>role</tt> is allowed to execute a given policy <tt>operation</tt> on the namespace.
+     *
+     * @param namespaceName namespace name
+     * @param policy policy name
+     * @param operation policy operation
+     * @param role role name
+     * @param authData authenticated data
+     * @return a completable future represents check result
+     */
+    default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                          PolicyName policy,
+                                                                          PolicyOperation operation,
+                                                                          String role,
+                                                                          AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+                new IllegalStateException("NamespacePolicyOperation is not supported by the Authorization provider you are using."));
+    }
+
+    default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+                                                  PolicyName policy,
+                                                  PolicyOperation operation,
+                                                  String role,
+                                                  AuthenticationDataSource authData) {
+        try {
+            return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
      * Grant authorization-action permission on a namespace to the given client
      * @param namespaceName
      * @param originalRole role not overriden by proxy role if request do pass through proxy
@@ -281,16 +389,32 @@ public interface AuthorizationProvider extends Closeable {
      * @param authData
      * @return CompletableFuture<Boolean>
      */
-    default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
-                                                                          PolicyOperation operation, String originalRole,
-                                                                          String role, AuthenticationDataSource authData) {
-        return isTenantAdmin(namespaceName.getTenant(), role, null, authData);
+    @Deprecated
+    default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                          PolicyName policy,
+                                                                          PolicyOperation operation,
+                                                                          String originalRole,
+                                                                          String role,
+                                                                          AuthenticationDataSource authData) {
+        return allowNamespacePolicyOperationAsync(
+            namespaceName,
+            policy,
+            operation,
+            StringUtils.isBlank(originalRole) ? role : originalRole,
+            authData
+        );
     }
 
-    default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation,
-                                                  String originalRole, String role, AuthenticationDataSource authData) {
+    @Deprecated
+    default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+                                                  PolicyName policy,
+                                                  PolicyOperation operation,
+                                                  String originalRole,
+                                                  String role,
+                                                  AuthenticationDataSource authData) {
         try {
-            return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData).get();
+            return allowNamespacePolicyOperationAsync(
+                namespaceName, policy, operation, originalRole, role, authData).get();
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -298,6 +422,35 @@ public interface AuthorizationProvider extends Closeable {
         }
     }
 
+    /**
+     * Check if a given <tt>role</tt> is allowed to execute a given topic <tt>operation</tt> on the topic.
+     *
+     * @param topic topic name
+     * @param role role name
+     * @param operation topic operation
+     * @param authData authenticated data
+     * @return CompletableFuture<Boolean>
+     */
+    default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+                                                                String role,
+                                                                TopicOperation operation,
+                                                                AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+            new IllegalStateException("TopicOperation is not supported by the Authorization provider you are using."));
+    }
+
+    default Boolean allowTopicOperation(TopicName topicName,
+                                        String role,
+                                        TopicOperation operation,
+                                        AuthenticationDataSource authData) {
+        try {
+            return allowTopicOperationAsync(topicName, role, operation, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
 
     /**
      * Grant authorization-action permission on a topic to the given client
@@ -308,27 +461,26 @@ public interface AuthorizationProvider extends Closeable {
      * @param authData
      * @return CompletableFuture<Boolean>
      */
-    default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role,
-                                                             TopicOperation operation,
-                                                             AuthenticationDataSource authData) {
-        switch (operation) {
-            case PRODUCE:
-                return canProduceAsync(topic, role, authData);
-            case CONSUME:
-                return canConsumeAsync(topic, role, authData, null);
-            case LOOKUP:
-                return canLookupAsync(topic, role, authData);
-            default:
-                return FutureUtil.failedFuture(
-                        new IllegalStateException(
-                                String.format("TopicOperation(%s) on topic(%s) by role(%s) is not supported" +
-                                                " by the Authorization provider you are using.",
-                                        operation.toString(), topic.toString(), role == null ? "null" : null)));
-        }
+    @Deprecated
+    default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+                                                                String originalRole,
+                                                                String role,
+                                                                TopicOperation operation,
+                                                                AuthenticationDataSource authData) {
+        return allowTopicOperationAsync(
+            topic,
+            StringUtils.isBlank(originalRole) ? role : originalRole,
+            operation,
+            authData
+        );
     }
 
-    default Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation,
-                                     AuthenticationDataSource authData) {
+    @Deprecated
+    default Boolean allowTopicOperation(TopicName topicName,
+                                        String originalRole,
+                                        String role,
+                                        TopicOperation operation,
+                                        AuthenticationDataSource authData) {
         try {
             return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
         } catch (InterruptedException e) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index b91d616..afa85ee 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.authorization;
 
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
-import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -35,12 +37,10 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
@@ -341,45 +341,84 @@ public class AuthorizationService {
         return provider.allowSinkOpsAsync(namespaceName, role, authenticationData);
     }
 
+    private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
+                                                  String originalPrincipal) {
+        if (proxyRoles.contains(authenticatedPrincipal)) {
+            // Request has come from a proxy
+            if (StringUtils.isBlank(originalPrincipal)) {
+                log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
+                throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
+            }
+            if (proxyRoles.contains(originalPrincipal)) {
+                log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
+                throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+            }
+        }
+    }
+
+    private boolean isProxyRole(String role) {
+        return role != null && conf.getProxyRoles().contains(role);
+    }
+
     /**
      * Grant authorization-action permission on a tenant to the given client
      *
-     * @param tenantName
-     * @param operation
-     * @param originalRole
-     * @param role
+     * @param tenantName tenant name
+     * @param operation tenant operation
+     * @param role role name
      * @param authData
      *            additional authdata in json for targeted authorization provider
      * @return IllegalArgumentException when tenant not found
      * @throws IllegalStateException
      *             when failed to grant permission
      */
-    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, TenantOperation operation,
-                                                                 String originalRole, String role,
-                                                                 AuthenticationDataSource authData) {
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
+                                                                TenantOperation operation,
+                                                                String role,
+                                                                AuthenticationDataSource authData) {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
 
         if (provider != null) {
-            return provider.allowTenantOperationAsync(tenantName, originalRole, role, operation, authData);
+            return provider.allowTenantOperationAsync(tenantName, role, operation, authData);
         }
 
         return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
                 "allowTenantOperationAsync"));
     }
 
-    public Boolean allowTenantOperation(String tenantName, TenantOperation operation, String orignalRole, String role,
-                                        AuthenticationDataSource authData) {
-        if (!this.conf.isAuthorizationEnabled()) {
-            return true;
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
+                                                                TenantOperation operation,
+                                                                String originalRole,
+                                                                String role,
+                                                                AuthenticationDataSource authData) {
+        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (isProxyRole(role)) {
+            CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync(
+                tenantName, operation, role, authData);
+            CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTenantOperationAsync(
+                tenantName, operation, originalRole, authData);
+            return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized);
+        } else {
+            return allowTenantOperationAsync(tenantName, operation, role, authData);
         }
+    }
 
-        if (provider != null) {
-            return provider.allowTenantOperation(tenantName, orignalRole, role, operation, authData);
+    public boolean allowTenantOperation(String tenantName,
+                                        TenantOperation operation,
+                                        String originalRole,
+                                        String role,
+                                        AuthenticationDataSource authData) {
+        try {
+            return allowTenantOperationAsync(
+                tenantName, operation, originalRole, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
         }
-
-        throw new IllegalStateException("No authorization provider configured for allowTenantOperation");
     }
 
     /**
@@ -387,7 +426,6 @@ public class AuthorizationService {
      *
      * @param namespaceName
      * @param operation
-     * @param originalRole
      * @param role
      * @param authData
      *            additional authdata in json for targeted authorization provider
@@ -397,31 +435,51 @@ public class AuthorizationService {
      */
     public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
                                                                    NamespaceOperation operation,
-                                                                   String originalRole, String role,
+                                                                   String role,
                                                                    AuthenticationDataSource authData) {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
 
         if (provider != null) {
-            return provider.allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData);
+            return provider.allowNamespaceOperationAsync(namespaceName, role, operation, authData);
         }
 
         return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
                 "allowNamespaceOperationAsync"));
     }
 
-    public Boolean allowNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation,
-                                           String originalPrincipal, String role, AuthenticationDataSource authData) {
-        if (!this.conf.isAuthorizationEnabled()) {
-            return true;
+    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   NamespaceOperation operation,
+                                                                   String originalRole,
+                                                                   String role,
+                                                                   AuthenticationDataSource authData) {
+        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (isProxyRole(role)) {
+            CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync(
+                namespaceName, operation, role, authData);
+            CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespaceOperationAsync(
+                namespaceName, operation, originalRole, authData);
+            return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized);
+        } else {
+            return allowNamespaceOperationAsync(namespaceName, operation, role, authData);
         }
+    }
 
-        if (provider != null) {
-            return provider.allowNamespaceOperation(namespaceName, originalPrincipal, role, operation, authData);
+    public boolean allowNamespaceOperation(NamespaceName namespaceName,
+                                           NamespaceOperation operation,
+                                           String originalRole,
+                                           String role,
+                                           AuthenticationDataSource authData) {
+        try {
+            return allowNamespaceOperationAsync(
+                namespaceName, operation, originalRole, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
         }
-
-        throw new IllegalStateException("No authorization provider configured for allowNamespaceOperation");
     }
 
     /**
@@ -429,7 +487,6 @@ public class AuthorizationService {
      *
      * @param namespaceName
      * @param operation
-     * @param originalRole
      * @param role
      * @param authData
      *            additional authdata in json for targeted authorization provider
@@ -437,33 +494,56 @@ public class AuthorizationService {
      * @throws IllegalStateException
      *             when failed to grant permission
      */
-    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
-                                                                         PolicyOperation operation, String originalRole,
-                                                                         String role, AuthenticationDataSource authData) {
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         PolicyName policy,
+                                                                         PolicyOperation operation,
+                                                                         String role,
+                                                                         AuthenticationDataSource authData) {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
 
         if (provider != null) {
-            return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData);
+            return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData);
         }
 
         return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
                 "allowNamespacePolicyOperationAsync"));
     }
 
-    public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy,
-                                                 PolicyOperation operation, String originalPrincipal, String role,
-                                                 AuthenticationDataHttps authData) {
-        if (!this.conf.isAuthorizationEnabled()) {
-            return true;
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         PolicyName policy,
+                                                                         PolicyOperation operation,
+                                                                         String originalRole,
+                                                                         String role,
+                                                                         AuthenticationDataSource authData) {
+        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (isProxyRole(role)) {
+            CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
+                namespaceName, policy, operation, role, authData);
+            CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespacePolicyOperationAsync(
+                namespaceName, policy, operation, originalRole, authData);
+            return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized);
+        } else {
+            return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData);
         }
+    }
 
-        if (provider != null) {
-            return provider.allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal, role, authData);
+    public boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+                                                 PolicyName policy,
+                                                 PolicyOperation operation,
+                                                 String originalRole,
+                                                 String role,
+                                                 AuthenticationDataSource authData) {
+        try {
+            return allowNamespacePolicyOperationAsync(
+                namespaceName, policy, operation, originalRole, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
         }
-
-        throw new IllegalStateException("No authorization provider configured for allowNamespacePolicyOperation");
     }
 
     /**
@@ -478,32 +558,43 @@ public class AuthorizationService {
      * @throws IllegalStateException
      *             when failed to grant permission
      */
-    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, TopicOperation operation,
-                                                               String originalRole, String role,
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
+                                                               TopicOperation operation,
+                                                               String role,
                                                                AuthenticationDataSource authData) {
+        if (log.isDebugEnabled()) {
+            log.debug("Check if role {} is allowed to execute topic operation {} on topic {}",
+                role, operation, topicName);
+        }
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
 
         if (provider != null) {
-            return provider.allowTopicOperationAsync(topicName, originalRole, role, operation, authData);
+            CompletableFuture<Boolean> allowFuture =
+                provider.allowTopicOperationAsync(topicName, role, operation, authData);
+            if (log.isDebugEnabled()) {
+                return allowFuture.whenComplete((allowed, exception) -> {
+                    if (exception == null) {
+                        if (allowed) {
+                            log.debug("Topic operation {} on topic {} is allowed: role = {}",
+                                operation, topicName, role);
+                        } else{
+                            log.debug("Topic operation {} on topic {} is NOT allowed: role = {}",
+                                operation, topicName, role);
+                        }
+                    } else {
+                        log.debug("Failed to check if topic operation {} on topic {} is allowed:"
+                                + " role = {}",
+                            operation, topicName, role, exception);
+                    }
+                });
+            } else {
+                return allowFuture;
+            }
         }
 
         return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
                 "allowTopicOperationAsync"));
     }
-
-    public Boolean allowTopicOperation(TopicName topicName, TopicOperation operation,
-                                         String orignalRole, String role,
-                                         AuthenticationDataSource authData) {
-        if (!this.conf.isAuthorizationEnabled()) {
-            return true;
-        }
-
-        if (provider != null) {
-            return provider.allowTopicOperation(topicName, orignalRole, role, operation, authData);
-        }
-
-        throw new IllegalStateException("No authorization provider configured for allowTopicOperation");
-    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index d7cea44..66d0c2e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.authorization;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -29,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import com.google.common.base.Joiner;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -39,8 +39,6 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.Policies;
-import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
-
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -528,38 +526,43 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
     }
 
     @Override
-    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
-                                                           TenantOperation operation,
-                                                           AuthenticationDataSource authData) {
-        return validateTenantAdminAccess(tenantName, originalRole, role, authData);
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
+                                                                String role,
+                                                                TenantOperation operation,
+                                                                AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(tenantName, role, authData);
     }
 
     @Override
-    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
-                                                              String role, NamespaceOperation operation,
-                                                              AuthenticationDataSource authData) {
-        return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   String role,
+                                                                   NamespaceOperation operation,
+                                                                   AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
     }
 
     @Override
-    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
-                                                                         PolicyOperation operation, String originalRole,
-                                                                         String role, AuthenticationDataSource authData) {
-        return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         PolicyName policy,
+                                                                         PolicyOperation operation,
+                                                                         String role,
+                                                                         AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
     }
 
     @Override
-    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, String originalRole, String role,
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
+                                                               String role,
                                                                TopicOperation operation,
                                                                AuthenticationDataSource authData) {
         CompletableFuture<Boolean> isAuthorizedFuture;
 
         switch (operation) {
-            case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData);
+            case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role, authData);
                 break;
-            case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData);
+            case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, role, authData);
                 break;
-            case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData, authData.getSubscription());
+            case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
                 break;
             default: isAuthorizedFuture = FutureUtil.failedFuture(
                     new IllegalStateException("TopicOperation is not supported."));
@@ -568,7 +571,14 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
         CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, authData, conf);
 
         return isSuperUserFuture
-                .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> isSuperUser || isAuthorized);
+                .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Verify if role {} is allowed to {} to topic {}:"
+                                + " isSuperUser={}, isAuthorized={}",
+                            role, operation, topicName, isSuperUser, isAuthorized);
+                    }
+                    return isSuperUser || isAuthorized;
+                });
     }
 
     private static String path(String... parts) {
@@ -578,43 +588,20 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
         return sb.toString();
     }
 
-    private CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, String originalRole, String role,
+    private CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName,
+                                                                 String role,
                                                                 AuthenticationDataSource authData) {
         try {
             TenantInfo tenantInfo = configCache.propertiesCache()
                     .get(path(POLICIES, tenantName))
                     .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
 
-            validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
-
-            if (role != null && conf.getProxyRoles().contains(role)) {
-                // role check
-                CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, authData, conf);
-                CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
-                CompletableFuture<Boolean> isRoleAuthorizedFuture = isRoleSuperUserFuture
-                        .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
-                                isRoleSuperUser || isRoleTenantAdmin);
-
-                // originalRole check
-                CompletableFuture<Boolean> isOriginalRoleSuperUserFuture = isSuperUser(originalRole, authData, conf);
-                CompletableFuture<Boolean> isOriginalRoleTenantAdminFuture = isTenantAdmin(tenantName, originalRole,
-                        tenantInfo, authData);
-                CompletableFuture<Boolean> isOriginalRoleAuthorizedFuture = isOriginalRoleSuperUserFuture
-                        .thenCombine(isOriginalRoleTenantAdminFuture, (isOriginalRoleSuperUser, isOriginalRoleTenantAdmin) ->
-                                isOriginalRoleSuperUser || isOriginalRoleTenantAdmin);
-
-                // merging
-                return isRoleAuthorizedFuture
-                        .thenCombine(isOriginalRoleAuthorizedFuture, (isRoleAuthorized, isOriginalRoleAuthorized) ->
-                                isRoleAuthorized && isOriginalRoleAuthorized);
-            } else {
-                // role check
-                CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, authData, conf);
-                CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
-                return isRoleSuperUserFuture
-                        .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
-                                isRoleSuperUser || isRoleTenantAdmin);
-            }
+            // role check
+            CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, authData, conf);
+            CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
+            return isRoleSuperUserFuture
+                    .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
+                            isRoleSuperUser || isRoleTenantAdmin);
         } catch (KeeperException.NoNodeException e) {
             log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
             throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
@@ -624,18 +611,4 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
         }
     }
 
-    private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
-                                                  String originalPrincipal) {
-        if (proxyRoles.contains(authenticatedPrincipal)) {
-            // Request has come from a proxy
-            if (StringUtils.isBlank(originalPrincipal)) {
-                log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
-                throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
-            }
-            if (proxyRoles.contains(originalPrincipal)) {
-                log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
-                throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
-            }
-        }
-    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
index 6b4fc8c..8555f34 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.web;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.io.IOException;
 
 import javax.servlet.Filter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b3f839b..5944871 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2368,6 +2368,36 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     /**
+     * Get partitioned topic metadata without checking the permission.
+     */
+    public static CompletableFuture<PartitionedTopicMetadata> unsafeGetPartitionedTopicMetadataAsync(
+        PulsarService pulsar, TopicName topicName) {
+        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture();
+
+        String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getNamespace(),
+            topicName.getDomain().toString(), topicName.getEncodedLocalName());
+
+        // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
+        // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
+        // producer/consumer
+        checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
+            .thenCompose(res -> pulsar.getBrokerService()
+                .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+            .thenAccept(metadata -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Total number of partitions for topic {} is {}", topicName,
+                        metadata.partitions);
+                }
+                metadataFuture.complete(metadata);
+            }).exceptionally(ex -> {
+            metadataFuture.completeExceptionally(ex.getCause());
+            return null;
+        });
+
+        return metadataFuture;
+    }
+
+    /**
      * Get the Topic object reference from the Pulsar broker
      */
     private Topic getTopicReference(TopicName topicName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 2131b18..03bbe12 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -152,7 +152,7 @@ public class Consumer {
         this.bytesOutCounter = new LongAdder();
         this.msgOutCounter = new LongAdder();
         this.appId = appId;
-        this.authenticationData = cnx.authenticationData;
+        this.authenticationData = cnx.getAuthenticationData();
         this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl();
 
         PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index b19c4b8..ba84138 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -99,7 +99,7 @@ public class Producer {
         this.epoch = epoch;
         this.closeFuture = new CompletableFuture<>();
         this.appId = appId;
-        this.authenticationData = cnx.authenticationData;
+        this.authenticationData = cnx.getAuthenticationData();
         this.msgIn = new Rate();
         this.chuckedMessageRate = new Rate();
         this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0b7499d..5d49f87 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.getPartitionedTopicMetadata;
+import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
 import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
 import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
@@ -142,6 +143,7 @@ public class ServerCnx extends PulsarHandler {
     // In case of proxy, if the authentication credentials are forwardable,
     // it will hold the credentials of the original client
     AuthenticationState originalAuthState;
+    AuthenticationDataSource originalAuthData;
     private boolean pendingAuthChallengeResponse = false;
 
     // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
@@ -274,6 +276,65 @@ public class ServerCnx extends PulsarHandler {
     // // Incoming commands handling
     // ////
 
+    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation) {
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        CompletableFuture<Boolean> isAuthorizedFuture;
+        if (service.isAuthorizationEnabled()) {
+            if (originalPrincipal != null) {
+                isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
+                    topicName, operation, originalPrincipal, getAuthenticationData());
+            } else {
+                isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            }
+            isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
+                topicName, operation, authRole, authenticationData);
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            isAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
+            if (!isProxyAuthorized) {
+                log.error("OriginalRole {} is not authorized to perform operation {} on topic {}",
+                    originalPrincipal, operation, topicName);
+            }
+            if (!isAuthorized) {
+                log.error("Role {} is not authorized to perform operation {} on topic {}",
+                    authRole, operation, topicName);
+            }
+            return isProxyAuthorized && isAuthorized;
+        });
+    }
+
+    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) {
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        CompletableFuture<Boolean> isAuthorizedFuture;
+        if (service.isAuthorizationEnabled()) {
+            if (authenticationData == null) {
+                authenticationData = new AuthenticationDataCommand("", subscriptionName);
+            } else {
+                authenticationData.setSubscription(subscriptionName);
+            }
+            if (originalAuthData != null) {
+                originalAuthData.setSubscription(subscriptionName);
+            }
+            return isTopicOperationAllowed(topicName, operation);
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            isAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
+            if (!isProxyAuthorized) {
+                log.error("OriginalRole {} is not authorized to perform operation {} on topic {}, subscription {}",
+                    originalPrincipal, operation, topicName, subscriptionName);
+            }
+            if (!isAuthorized) {
+                log.error("Role {} is not authorized to perform operation {} on topic {}, subscription {}",
+                    authRole, operation, topicName, subscriptionName);
+            }
+            return isProxyAuthorized && isAuthorized;
+        });
+    }
+
     @Override
     protected void handleLookup(CommandLookupTopic lookup) {
         final long requestId = lookup.getRequestId();
@@ -298,18 +359,10 @@ public class ServerCnx extends PulsarHandler {
                 lookupSemaphore.release();
                 return;
             }
-            CompletableFuture<Boolean> isProxyAuthorizedFuture;
-            if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-                isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                        TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
-            } else {
-                isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
-            }
-            String finalOriginalPrincipal = originalPrincipal;
-            isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-                if (isProxyAuthorized) {
+            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
+                if (isAuthorized) {
                     lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
-                            finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
+                            getPrincipal(), getAuthenticationData(),
                             requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
                                     ctx.writeAndFlush(lookupResponse);
@@ -325,14 +378,14 @@ public class ServerCnx extends PulsarHandler {
                             });
                 } else {
                     final String msg = "Proxy Client is not authorized to Lookup";
-                    log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                    log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
                     ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
                     lookupSemaphore.release();
                 }
                 return null;
             }).exceptionally(ex -> {
                 final String msg = "Exception occured while trying to authorize lookup";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName, ex);
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName, ex);
                 ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
                 lookupSemaphore.release();
                 return null;
@@ -370,19 +423,10 @@ public class ServerCnx extends PulsarHandler {
                 lookupSemaphore.release();
                 return;
             }
-            CompletableFuture<Boolean> isProxyAuthorizedFuture;
-            if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-                isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                        TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
-            } else {
-                isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
-            }
-            String finalOriginalPrincipal = originalPrincipal;
-            isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-                if (isProxyAuthorized) {
-                    getPartitionedTopicMetadata(getBrokerService().pulsar(),
-                            authRole, finalOriginalPrincipal, authenticationData,
-                            topicName).handle((metadata, ex) -> {
+            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
+                        .handle((metadata, ex) -> {
                                 if (ex == null) {
                                     int partitions = metadata.partitions;
                                     ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
@@ -408,7 +452,7 @@ public class ServerCnx extends PulsarHandler {
                             });
                 } else {
                     final String msg = "Proxy Client is not authorized to Get Partition Metadata";
-                    log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                    log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
                     ctx.writeAndFlush(
                             Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
                     lookupSemaphore.release();
@@ -416,7 +460,7 @@ public class ServerCnx extends PulsarHandler {
                 return null;
             }).exceptionally(ex -> {
                 final String msg = "Exception occured while trying to authorize get Partition Metadata";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
                 ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
                 lookupSemaphore.release();
                 return null;
@@ -506,6 +550,9 @@ public class ServerCnx extends PulsarHandler {
         String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
         AuthData brokerData = authState.authenticate(clientData);
 
+        if (log.isDebugEnabled()) {
+            log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole);
+        }
 
         if (authState.isComplete()) {
             // Authentication has completed. It was either:
@@ -521,7 +568,7 @@ public class ServerCnx extends PulsarHandler {
 
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
-                        remoteAddress, authMethod, authRole, originalPrincipal);
+                        remoteAddress, authMethod, this.authRole, originalPrincipal);
             }
 
             if (state != State.Connected) {
@@ -608,8 +655,12 @@ public class ServerCnx extends PulsarHandler {
         checkArgument(state == State.Start);
 
         if (log.isDebugEnabled()) {
-            log.debug("Received CONNECT from {}, auth enabled: {}",
-                remoteAddress, service.isAuthenticationEnabled());
+            log.debug("Received CONNECT from {}, auth enabled: {}:"
+                    + " has original principal = {}, original principal = {}",
+                remoteAddress,
+                service.isAuthenticationEnabled(),
+                connect.hasOriginalPrincipal(),
+                connect.getOriginalPrincipal());
         }
 
         String clientVersion = connect.getClientVersion();
@@ -657,6 +708,12 @@ public class ServerCnx extends PulsarHandler {
 
             authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
             authenticationData = authState.getAuthDataSource();
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Authenticate role : {}", remoteAddress,
+                    authState != null ? authState.getAuthRole() : null);
+            }
+
             state = doAuthentication(clientData, clientProtocolVersion, clientVersion);
 
             // This will fail the check if:
@@ -685,9 +742,18 @@ public class ServerCnx extends PulsarHandler {
                         AuthData.of(connect.getOriginalAuthData().getBytes()),
                         remoteAddress,
                         sslSession);
+                originalAuthData = originalAuthState.getAuthDataSource();
                 originalPrincipal = originalAuthState.getAuthRole();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Authenticate original role : {}", remoteAddress, originalPrincipal);
+                }
             } else {
                 originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Authenticate original role (forwarded from proxy): {}",
+                        remoteAddress, originalPrincipal);
+                }
             }
         } catch (Exception e) {
             String msg = "Unable to authenticate";
@@ -738,6 +804,11 @@ public class ServerCnx extends PulsarHandler {
             return;
         }
 
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}",
+                remoteAddress, authRole, originalPrincipal);
+        }
+
         if (invalidOriginalPrincipal(originalPrincipal)) {
             final String msg = "Valid Proxy Client role should be provided while subscribing ";
             log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
@@ -766,33 +837,15 @@ public class ServerCnx extends PulsarHandler {
         final boolean forceTopicCreation = subscribe.getForceTopicCreation();
         final PulsarApi.KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? subscribe.getKeySharedMeta() : null;
 
-        CompletableFuture<Boolean> isProxyAuthorizedFuture;
-        if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            authenticationData.setSubscription(subscriptionName);
-            isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                    TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
-        } else {
-            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
-        }
-        isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-            if (isProxyAuthorized) {
-                CompletableFuture<Boolean> authorizationFuture;
-                if (service.isAuthorizationEnabled()) {
-                    if (authenticationData == null) {
-                        authenticationData = new AuthenticationDataCommand("", subscriptionName);
-                    } else {
-                        authenticationData.setSubscription(subscriptionName);
-                    }
-                    authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                            TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
-                } else {
-                    authorizationFuture = CompletableFuture.completedFuture(true);
-                }
-
-                authorizationFuture.thenApply(isAuthorized -> {
+        CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
+            topicName,
+            subscriptionName,
+            TopicOperation.CONSUME
+        );
+        isAuthorizedFuture.thenApply(isAuthorized -> {
                     if (isAuthorized) {
                         if (log.isDebugEnabled()) {
-                            log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
+                            log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, getPrincipal());
                         }
 
                         log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
@@ -920,24 +973,12 @@ public class ServerCnx extends PulsarHandler {
                                 });
                     } else {
                         String msg = "Client is not authorized to subscribe";
-                        log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                        log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
                         ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
                     }
                     return null;
-                }).exceptionally(e -> {
-                    String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
-                    log.warn(msg);
-                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
-                    return null;
-                });
-            } else {
-                final String msg = "Proxy Client is not authorized to subscribe";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
-                ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
-            }
-            return null;
         }).exceptionally(ex -> {
-            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
+            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
             if (ex.getCause() instanceof PulsarServerException) {
                 log.info(msg);
             } else {
@@ -990,27 +1031,13 @@ public class ServerCnx extends PulsarHandler {
             return;
         }
 
-        CompletableFuture<Boolean> isProxyAuthorizedFuture;
-        if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                    TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
-        } else {
-            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
-        }
-        isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-            if (isProxyAuthorized) {
-                CompletableFuture<Boolean> authorizationFuture;
-                if (service.isAuthorizationEnabled()) {
-                    authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                            TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
-                } else {
-                    authorizationFuture = CompletableFuture.completedFuture(true);
-                }
-
-                authorizationFuture.thenApply(isAuthorized -> {
+        CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
+            topicName, TopicOperation.PRODUCE
+        );
+        isAuthorizedFuture.thenApply(isAuthorized -> {
                     if (isAuthorized) {
                         if (log.isDebugEnabled()) {
-                            log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
+                            log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, getPrincipal());
                         }
                         CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
                         CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId,
@@ -1093,7 +1120,7 @@ public class ServerCnx extends PulsarHandler {
                             });
 
                             schemaVersionFuture.thenAccept(schemaVersion -> {
-                                Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
+                                Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, getPrincipal(),
                                     isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName);
 
                                 try {
@@ -1154,24 +1181,12 @@ public class ServerCnx extends PulsarHandler {
                         });
                     } else {
                         String msg = "Client is not authorized to Produce";
-                        log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                        log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
                         ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
                     }
                     return null;
-                }).exceptionally(e -> {
-                    String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
-                    log.warn(msg);
-                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
-                    return null;
-                });
-            } else {
-                final String msg = "Proxy Client is not authorized to Produce";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
-                ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
-            }
-            return null;
         }).exceptionally(ex -> {
-            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
+            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
             log.warn(msg);
             ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage()));
             return null;
@@ -1989,7 +2004,11 @@ public class ServerCnx extends PulsarHandler {
     }
 
     public AuthenticationDataSource getAuthenticationData() {
-        return authenticationData;
+        return originalAuthData != null ? originalAuthData : authenticationData;
+    }
+
+    public String getPrincipal() {
+        return originalPrincipal != null ? originalPrincipal : authRole;
     }
 
     public AuthenticationProvider getAuthenticationProvider() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index da21f72..f1e2347 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -68,7 +68,6 @@ import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
-import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -422,10 +421,12 @@ public abstract class PulsarWebResource {
      * will throw an exception to redirect to assigned owner or leader; if authoritative is true then it will try to
      * acquire all the namespace bundles.
      *
-     * @param fqnn
-     * @param authoritative
-     * @param readOnly
-     * @param bundleData
+     * @param tenant tenant name
+     * @param cluster cluster name
+     * @param namespace namespace name
+     * @param authoritative if it is an authoritative request
+     * @param readOnly if the request is read-only
+     * @param bundleData bundle data
      */
     protected void validateNamespaceOwnershipWithBundles(String tenant, String cluster, String namespace,
             boolean authoritative, boolean readOnly, BundlesData bundleData) {
@@ -582,11 +583,8 @@ public abstract class PulsarWebResource {
      * client to the appropriate broker. If no broker owns the namespace yet, this function will try to acquire the
      * ownership by default.
      *
+     * @param topicName topic name
      * @param authoritative
-     *
-     * @param tenant
-     * @param cluster
-     * @param namespace
      */
     protected void validateTopicOwnership(TopicName topicName, boolean authoritative) {
         NamespaceService nsService = pulsar().getNamespaceService();
@@ -794,31 +792,33 @@ public abstract class PulsarWebResource {
     protected static final int NOT_IMPLEMENTED = 501;
 
     public void validateTenantOperation(String tenant, TenantOperation operation) {
-        if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+            && pulsar().getBrokerService().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
             }
 
-            Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
-                    .allowTenantOperation(
-                            tenant, operation, originalPrincipal(), clientAppId(), clientAuthData());
-
+            boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+                .allowTenantOperation(tenant, operation, originalPrincipal(), clientAppId(), clientAuthData());
             if (!isAuthorized) {
-                throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTenantOperation for" +
-                                " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]",
+                throw new RestException(Status.UNAUTHORIZED,
+                    String.format("Unauthorized to validateTenantOperation for"
+                            + " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]",
                         originalPrincipal(), clientAppId(), operation.toString(), tenant));
             }
         }
     }
 
     public void validateNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation) {
-        if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+            && pulsar().getBrokerService().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
             }
 
-            Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
-                    .allowNamespaceOperation(namespaceName, operation, originalPrincipal(), clientAppId(), clientAuthData());
+            boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+                    .allowNamespaceOperation(namespaceName, operation, originalPrincipal(),
+                        clientAppId(), clientAuthData());
 
             if (!isAuthorized) {
                 throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespaceOperation for" +
@@ -827,14 +827,18 @@ public abstract class PulsarWebResource {
         }
     }
 
-    public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) {
-        if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+    public void validateNamespacePolicyOperation(NamespaceName namespaceName,
+                                                 PolicyName policy,
+                                                 PolicyOperation operation) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+            && pulsar().getBrokerService().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
             }
 
-            Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
-                    .allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal(), clientAppId(), clientAuthData());
+            boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+                    .allowNamespacePolicyOperation(namespaceName, policy, operation,
+                        originalPrincipal(), clientAppId(), clientAuthData());
 
             if (!isAuthorized) {
                 throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespacePolicyOperation for" +
@@ -842,20 +846,4 @@ public abstract class PulsarWebResource {
             }
         }
     }
-
-    public void validateTopicOperation(TopicName topicName, TopicOperation operation) {
-        if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId())) {
-                throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
-            }
-
-            Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
-                    .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), clientAuthData());
-
-            if (!isAuthorized) {
-                throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" +
-                        " operation [%s] on topic [%s]", operation.toString(), topicName));
-            }
-        }
-    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 94a02f6..fd56340 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -180,6 +181,8 @@ public class ServerCnxTest {
         doReturn(zkCache).when(pulsar).getLocalZkCacheService();
 
         brokerService = spy(new BrokerService(pulsar));
+        BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
+        doReturn(interceptor).when(brokerService).getInterceptor();
         doReturn(brokerService).when(pulsar).getBrokerService();
         doReturn(executor).when(pulsar).getOrderedExecutor();
 
@@ -474,7 +477,7 @@ public class ServerCnxTest {
     public void testProducerCommandWithAuthorizationPositive() throws Exception {
         AuthorizationService authorizationService = mock(AuthorizationService.class);
         doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         resetChannel();
@@ -605,7 +608,7 @@ public class ServerCnxTest {
     public void testProducerCommandWithAuthorizationNegative() throws Exception {
         AuthorizationService authorizationService = mock(AuthorizationService.class);
         doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1195,7 +1198,7 @@ public class ServerCnxTest {
     public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
         AuthorizationService authorizationService = mock(AuthorizationService.class);
         doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1217,7 +1220,7 @@ public class ServerCnxTest {
     public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
         AuthorizationService authorizationService = mock(AuthorizationService.class);
         doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 180142c..687b08d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -22,20 +22,18 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.net.URI;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import javax.naming.AuthenticationException;
 
 import lombok.Cleanup;
-
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
@@ -54,16 +52,12 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.util.RestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
 
@@ -435,7 +429,8 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         }
 
         @Override
-        public CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
+        public CompletableFuture<Boolean> isSuperUser(String role,
+                                                      ServiceConfiguration serviceConfiguration) {
             Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
             return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
         }
@@ -509,32 +504,38 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         }
 
         @Override
-        public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+        public CompletableFuture<Boolean> allowTenantOperationAsync(
+            String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
             return CompletableFuture.completedFuture(true);
         }
 
         @Override
-        public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+        public Boolean allowTenantOperation(
+            String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
             return true;
         }
 
         @Override
-        public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+        public CompletableFuture<Boolean> allowNamespaceOperationAsync(
+            NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
             return CompletableFuture.completedFuture(true);
         }
 
         @Override
-        public Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+        public Boolean allowNamespaceOperation(
+            NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
             return null;
         }
 
         @Override
-        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+        public CompletableFuture<Boolean> allowTopicOperationAsync(
+            TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
             return CompletableFuture.completedFuture(true);
         }
 
         @Override
-        public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+        public Boolean allowTopicOperation(
+            TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
             return true;
         }
     }
@@ -566,18 +567,10 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
 
     public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
         @Override
-        public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
-            try {
-                return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
-            } catch (InterruptedException e) {
-                throw new RestException(e);
-            } catch (ExecutionException e) {
-                throw new RestException(e.getCause());
-            }
-        }
-
-        @Override
-        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic,
+                                                                   String role,
+                                                                   TopicOperation operation,
+                                                                   AuthenticationDataSource authData) {
             CompletableFuture<Boolean> future = new CompletableFuture<>();
             if (authData.hasSubscription()) {
                 String subscription = authData.getSubscription();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 56a933b..697ddf9 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -62,7 +62,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class AdminProxyHandler extends ProxyServlet {
+
     private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class);
+
+    private static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
+
     private static final Set<String> functionRoutes = new HashSet<>(Arrays.asList(
         "/admin/v3/function",
         "/admin/v2/function",
@@ -334,7 +338,7 @@ class AdminProxyHandler extends ProxyServlet {
         super.addProxyHeaders(clientRequest, proxyRequest);
         String user = (String) clientRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName);
         if (user != null) {
-            proxyRequest.header("X-Original-Principal", user);
+            proxyRequest.header(ORIGINAL_PRINCIPAL_HEADER, user);
         }
     }
 }