You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/05/18 23:05:32 UTC

[pulsar] branch master updated: [Issue 5720][authorization provider] Add granularity (#6428)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 029b6f4  [Issue 5720][authorization provider] Add granularity (#6428)
029b6f4 is described below

commit 029b6f4cae5b014d51c9894068590cf3904c5a23
Author: Alexandre DUVAL <ka...@gmail.com>
AuthorDate: Tue May 19 01:05:21 2020 +0200

    [Issue 5720][authorization provider] Add granularity (#6428)
    
    Fixes #5720
    
    ### Motivation
    
    Provide "real" authz abilities to pulsar resources.
    
    ### Modifications
    
    Add stuff to `AuthorizationProvider` interface, and use them on every pulsar resource management auth (tenant, namespace, topics, functions, connectors, ...)
---
 .../authentication/AuthenticationDataCommand.java  |  32 +++-
 .../authentication/AuthenticationDataSource.java   |  23 +++
 .../authorization/AuthorizationProvider.java       | 118 ++++++++++++++
 .../broker/authorization/AuthorizationService.java | 173 +++++++++++++++++++++
 .../authorization/PulsarAuthorizationProvider.java | 125 ++++++++++++++-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 153 +++++++++---------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  25 +--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  28 ++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  33 ++--
 .../pulsar/broker/web/PulsarWebResource.java       |  70 +++++++++
 .../apache/pulsar/broker/admin/NamespacesTest.java |  20 ++-
 .../pulsar/broker/service/ServerCnxTest.java       |  16 +-
 .../api/AuthorizationProducerConsumerTest.java     |  65 +++++++-
 .../service/web/DiscoveryServiceWebTest.java       |   8 +-
 .../common/policies/data/NamespaceOperation.java   |  41 +++++
 .../pulsar/common/policies/data/PolicyName.java    |  45 ++++++
 .../common/policies/data/PolicyOperation.java      |  28 ++++
 .../common/policies/data/TenantOperation.java      |  29 ++++
 .../common/policies/data/TopicOperation.java       |  50 ++++++
 19 files changed, 941 insertions(+), 141 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
index 6539fb3..7299eae 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
@@ -28,9 +28,22 @@ public class AuthenticationDataCommand implements AuthenticationDataSource {
     protected final String authData;
     protected final SocketAddress remoteAddress;
     protected final SSLSession sslSession;
+    protected String subscription;
 
     public AuthenticationDataCommand(String authData) {
-        this(authData, null, null);
+        this(authData, null, null, null);
+    }
+
+    public AuthenticationDataCommand(String authData, String subscription) {
+        this(authData, null, null, subscription);
+    }
+
+    public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession,
+                                     String subscription) {
+        this.authData = authData;
+        this.remoteAddress = remoteAddress;
+        this.sslSession = sslSession;
+        this.subscription = subscription;
     }
 
     public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession) {
@@ -85,4 +98,21 @@ public class AuthenticationDataCommand implements AuthenticationDataSource {
         }
     }
 
+    /*
+     * Subscription
+     */
+    @Override
+    public boolean hasSubscription() {
+        return this.subscription != null;
+    }
+
+    @Override
+    public void setSubscription(String subscription) {
+        this.subscription = subscription;
+    }
+
+    @Override
+    public String getSubscription() {
+        return subscription;
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
index b72b99b..eb9ed2b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
@@ -127,4 +127,27 @@ public interface AuthenticationDataSource {
     default SocketAddress getPeerAddress() {
         return null;
     }
+
+    /**
+     * Check if subscription is defined available.
+     *
+     * @return true if this authentication data contain subscription
+     */
+    default boolean hasSubscription() {
+        return false;
+    }
+
+    /**
+     * Subscription name can be necessary for consumption
+     *
+     * @return a <code>String</code> containing the subscription name
+     */
+    default String getSubscription() { return null; }
+
+    /**
+     * Subscription name can be necessary for consumption
+     *
+     * @return a <code>String</code> containing the subscription name
+     */
+    default void setSubscription(String subscription) { };
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 572e403..c4fcc5e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -29,7 +30,14 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
 
 /**
  * Provider of authorization mechanism
@@ -186,4 +194,114 @@ public interface AuthorizationProvider extends Closeable {
     CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
             String authDataJson);
 
+    /**
+     * Grant authorization-action permission on a tenant to the given client
+     * @param tenantName
+     * @param originalRole role not overriden by proxy role if request do pass through proxy
+     * @param role originalRole | proxyRole if the request didn't pass through proxy
+     * @param operation
+     * @param authData
+     * @return CompletableFuture<Boolean>
+     */
+    default CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
+                                                            TenantOperation operation,
+                                                            AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(new IllegalStateException(
+                String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" +
+                                " provider you are using.",
+                        operation.toString(), tenantName)));
+    }
+
+    default Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation,
+                                      AuthenticationDataSource authData) {
+        try {
+            return allowTenantOperationAsync(tenantName, originalRole, role, operation, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
+     * Grant authorization-action permission on a namespace to the given client
+     * @param namespaceName
+     * @param originalRole role not overriden by proxy role if request do pass through proxy
+     * @param role originalRole | proxyRole if the request didn't pass through proxy
+     * @param operation
+     * @param authData
+     * @return CompletableFuture<Boolean>
+     */
+    default CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
+                                                                 String role, NamespaceOperation operation,
+                                                                 AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+            new IllegalStateException("NamespaceOperation is not supported by the Authorization provider you are using."));
+    }
+
+    default Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role,
+                                         NamespaceOperation operation, AuthenticationDataSource authData) {
+        try {
+            return allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
+     * Grant authorization-action permission on a namespace to the given client
+     * @param namespaceName
+     * @param originalRole role not overriden by proxy role if request do pass through proxy
+     * @param role originalRole | proxyRole if the request didn't pass through proxy
+     * @param operation
+     * @param authData
+     * @return CompletableFuture<Boolean>
+     */
+    default CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+                                                                          PolicyOperation operation, String originalRole,
+                                                                          String role, AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+                new IllegalStateException("NamespacePolicyOperation is not supported by the Authorization provider you are using."));
+    }
+
+    default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation,
+                                                  String originalRole, String role, AuthenticationDataSource authData) {
+        try {
+            return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+
+    /**
+     * Grant authorization-action permission on a topic to the given client
+     * @param topic
+     * @param originalRole role not overriden by proxy role if request do pass through proxy
+     * @param role originalRole | proxyRole if the request didn't pass through proxy
+     * @param operation
+     * @param authData
+     * @return CompletableFuture<Boolean>
+     */
+    default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role,
+                                                             TopicOperation operation,
+                                                             AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+            new IllegalStateException("TopicOperation is not supported by the Authorization provider you are using."));
+    }
+
+    default Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation,
+                                     AuthenticationDataSource authData) {
+        try {
+            return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 3bf4458..e92ab84 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -21,12 +21,19 @@ package org.apache.pulsar.broker.authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -318,4 +325,170 @@ public class AuthorizationService {
                                                        AuthenticationDataSource authenticationData) {
         return provider.allowFunctionOpsAsync(namespaceName, role, authenticationData);
     }
+
+    /**
+     * Grant authorization-action permission on a tenant to the given client
+     *
+     * @param tenantName
+     * @param operation
+     * @param originalRole
+     * @param role
+     * @param authData
+     *            additional authdata in json for targeted authorization provider
+     * @return IllegalArgumentException when tenant not found
+     * @throws IllegalStateException
+     *             when failed to grant permission
+     */
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, TenantOperation operation,
+                                                                 String originalRole, String role,
+                                                                 AuthenticationDataSource authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        if (provider != null) {
+            return provider.allowTenantOperationAsync(tenantName, originalRole, role, operation, authData);
+        }
+
+        return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+                "allowTenantOperationAsync"));
+    }
+
+    public Boolean allowTenantOperation(String tenantName, TenantOperation operation, String orignalRole, String role,
+                                        AuthenticationDataSource authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return true;
+        }
+
+        if (provider != null) {
+            return provider.allowTenantOperation(tenantName, orignalRole, role, operation, authData);
+        }
+
+        throw new IllegalStateException("No authorization provider configured for allowTenantOperation");
+    }
+
+    /**
+     * Grant authorization-action permission on a namespace to the given client
+     *
+     * @param namespaceName
+     * @param operation
+     * @param originalRole
+     * @param role
+     * @param authData
+     *            additional authdata in json for targeted authorization provider
+     * @return IllegalArgumentException when namespace not found
+     * @throws IllegalStateException
+     *             when failed to grant permission
+     */
+    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   NamespaceOperation operation,
+                                                                   String originalRole, String role,
+                                                                   AuthenticationDataSource authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        if (provider != null) {
+            return provider.allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData);
+        }
+
+        return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+                "allowNamespaceOperationAsync"));
+    }
+
+    public Boolean allowNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation,
+                                           String originalPrincipal, String role, AuthenticationDataSource authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return true;
+        }
+
+        if (provider != null) {
+            return provider.allowNamespaceOperation(namespaceName, originalPrincipal, role, operation, authData);
+        }
+
+        throw new IllegalStateException("No authorization provider configured for allowNamespaceOperation");
+    }
+
+    /**
+     * Grant authorization-action permission on a namespace to the given client
+     *
+     * @param namespaceName
+     * @param operation
+     * @param originalRole
+     * @param role
+     * @param authData
+     *            additional authdata in json for targeted authorization provider
+     * @return IllegalArgumentException when namespace not found
+     * @throws IllegalStateException
+     *             when failed to grant permission
+     */
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+                                                                         PolicyOperation operation, String originalRole,
+                                                                         String role, AuthenticationDataSource authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        if (provider != null) {
+            return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData);
+        }
+
+        return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+                "allowNamespacePolicyOperationAsync"));
+    }
+
+    public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy,
+                                                 PolicyOperation operation, String originalPrincipal, String role,
+                                                 AuthenticationDataHttps authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return true;
+        }
+
+        if (provider != null) {
+            return provider.allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal, role, authData);
+        }
+
+        throw new IllegalStateException("No authorization provider configured for allowNamespacePolicyOperation");
+    }
+
+    /**
+     * Grant authorization-action permission on a topic to the given client
+     *
+     * @param topicName
+     * @param operation
+     * @param role
+     * @param authData
+     *            additional authdata in json for targeted authorization provider
+     * @return IllegalArgumentException when namespace not found
+     * @throws IllegalStateException
+     *             when failed to grant permission
+     */
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, TopicOperation operation,
+                                                               String originalRole, String role,
+                                                               AuthenticationDataSource authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        if (provider != null) {
+            return provider.allowTopicOperationAsync(topicName, originalRole, role, operation, authData);
+        }
+
+        return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+                "allowTopicOperationAsync"));
+    }
+
+    public Boolean allowTopicOperation(TopicName topicName, TopicOperation operation,
+                                         String orignalRole, String role,
+                                         AuthenticationDataSource authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return true;
+        }
+
+        if (provider != null) {
+            return provider.allowTopicOperation(topicName, orignalRole, role, operation, authData);
+        }
+
+        throw new IllegalStateException("No authorization provider configured for allowTopicOperation");
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index b025f80..40b2021 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -27,8 +27,9 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
+import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -36,8 +37,17 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.Policies;
 import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
+
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
@@ -46,7 +56,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
+import javax.ws.rs.core.Response;
 
 /**
  * Default authorization provider that stores authorization policies under local-zookeeper.
@@ -501,4 +511,115 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
         }
     }
 
+    @Override
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role,
+                                                           TenantOperation operation,
+                                                           AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(tenantName, originalRole, role, authData);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
+                                                              String role, NamespaceOperation operation,
+                                                              AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+                                                                         PolicyOperation operation, String originalRole,
+                                                                         String role, AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, String originalRole, String role,
+                                                               TopicOperation operation,
+                                                               AuthenticationDataSource authData) {
+        CompletableFuture<Boolean> isAuthorizedFuture;
+
+        switch (operation) {
+            case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role, authData);
+                break;
+            case PRODUCE: isAuthorizedFuture= canProduceAsync(topicName, role, authData);
+                break;
+            case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
+                break;
+            default: isAuthorizedFuture = FutureUtil.failedFuture(
+                    new IllegalStateException("TopicOperation is not supported."));
+        }
+
+        CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, conf);
+
+        return isSuperUserFuture
+                .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> isSuperUser || isAuthorized);
+    }
+
+    private static String path(String... parts) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("/admin/");
+        Joiner.on('/').appendTo(sb, parts);
+        return sb.toString();
+    }
+
+    private CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, String originalRole, String role,
+                                                                AuthenticationDataSource authData) {
+        try {
+            TenantInfo tenantInfo = configCache.propertiesCache()
+                    .get(path(POLICIES, tenantName))
+                    .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
+
+            validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+
+            if (role != null && conf.getProxyRoles().contains(role)) {
+                // role check
+                CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, conf);
+                CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
+                CompletableFuture<Boolean> isRoleAuthorizedFuture = isRoleSuperUserFuture
+                        .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
+                                isRoleSuperUser || isRoleTenantAdmin);
+
+                // originalRole check
+                CompletableFuture<Boolean> isOriginalRoleSuperUserFuture = isSuperUser(originalRole, conf);
+                CompletableFuture<Boolean> isOriginalRoleTenantAdminFuture = isTenantAdmin(tenantName, originalRole,
+                        tenantInfo, authData);
+                CompletableFuture<Boolean> isOriginalRoleAuthorizedFuture = isOriginalRoleSuperUserFuture
+                        .thenCombine(isOriginalRoleTenantAdminFuture, (isOriginalRoleSuperUser, isOriginalRoleTenantAdmin) ->
+                                isOriginalRoleSuperUser || isOriginalRoleTenantAdmin);
+
+                // merging
+                return isRoleAuthorizedFuture
+                        .thenCombine(isOriginalRoleAuthorizedFuture, (isRoleAuthorized, isOriginalRoleAuthorized) ->
+                                isRoleAuthorized && isOriginalRoleAuthorized);
+            } else {
+                // role check
+                CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, conf);
+                CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
+                return isRoleSuperUserFuture
+                        .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
+                                isRoleSuperUser || isRoleTenantAdmin);
+            }
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
+            throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+        } catch (Exception e) {
+            log.error("Failed to get tenant {}", tenantName, e);
+            throw new RestException(e);
+        }
+    }
+
+    private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
+                                                  String originalPrincipal) {
+        if (proxyRoles.contains(authenticatedPrincipal)) {
+            // Request has come from a proxy
+            if (StringUtils.isBlank(originalPrincipal)) {
+                log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
+                throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
+            }
+            if (proxyRoles.contains(originalPrincipal)) {
+                log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
+                throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+            }
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b0504ee..8a04adf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -80,15 +80,19 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.KeeperException;
@@ -103,7 +107,7 @@ public abstract class NamespacesBase extends AdminResource {
     private static final long MAX_BUNDLES = ((long) 1) << 32;
 
     protected List<String> internalGetTenantNamespaces(String tenant) {
-        validateAdminAccessForTenant(tenant);
+        validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
 
         try {
             return getListOfNamespaces(tenant);
@@ -117,9 +121,8 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalCreateNamespace(Policies policies) {
+        validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
         validatePoliciesReadOnlyAccess();
-        validateAdminAccessForTenant(namespaceName.getTenant());
-
         validatePolicies(namespaceName, policies);
 
         try {
@@ -138,7 +141,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
         validatePoliciesReadOnlyAccess();
 
         // ensure that non-global namespace is directed to the correct cluster
@@ -282,7 +285,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
         validatePoliciesReadOnlyAccess();
 
         // ensure that non-global namespace is directed to the correct cluster
@@ -353,7 +356,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
 
         try {
             AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -384,7 +387,7 @@ public abstract class NamespacesBase extends AdminResource {
 
 
     protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
 
         try {
             AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -414,7 +417,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalRevokePermissionsOnNamespace(String role) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -444,7 +447,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalRevokePermissionsOnSubscription(String subscriptionName, String role) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
         validatePoliciesReadOnlyAccess();
 
         AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -457,6 +460,8 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected Set<String> internalGetNamespaceReplicationClusters() {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ);
+
         if (!namespaceName.isGlobal()) {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "Cannot get the replication clusters for a non-global namespace");
@@ -467,7 +472,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
@@ -525,7 +530,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetNamespaceMessageTTL(int messageTTL) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.TTL, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         if (messageTTL < 0) {
@@ -820,7 +825,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalModifyDeduplication(boolean enableDeduplication) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         Entry<Policies, Stat> policiesNode = null;
@@ -856,9 +861,8 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected void internalUnloadNamespace(AsyncResponse asyncResponse) {
-        log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName);
-
         validateSuperUserAccess();
+        log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName);
 
         if (namespaceName.isGlobal()) {
             // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
@@ -903,11 +907,10 @@ public abstract class NamespacesBase extends AdminResource {
 
     
     protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffinityGroup) {
+        validateSuperUserAccess();
         log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup,
                 this.namespaceName);
 
-        validateSuperUserAccess();
-
         if (namespaceName.isGlobal()) {
             // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
             validateGlobalNamespaceOwnership(namespaceName);
@@ -994,9 +997,9 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
+        validateSuperUserAccess();
         log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
 
-        validateSuperUserAccess();
         Policies policies = getNamespacePolicies(namespaceName);
 
         NamespaceBundle bundle = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName.toString(), bundleRange);
@@ -1042,9 +1045,9 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
+        validateSuperUserAccess();
         log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
 
-        validateSuperUserAccess();
         Policies policies = getNamespacePolicies(namespaceName);
 
         if (namespaceName.isGlobal()) {
@@ -1095,8 +1098,8 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
-        log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
         validateSuperUserAccess();
+        log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
 
         Entry<Policies, Stat> policiesNode = null;
 
@@ -1132,7 +1135,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected PublishRate internalGetPublishRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         PublishRate publishRate = policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
@@ -1146,8 +1149,8 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
-        log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
         validateSuperUserAccess();
+        log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
 
         Entry<Policies, Stat> policiesNode = null;
 
@@ -1185,7 +1188,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected DispatchRate internalGetTopicDispatchRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1201,8 +1204,8 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
-        log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
         validateSuperUserAccess();
+        log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
 
         Entry<Policies, Stat> policiesNode = null;
 
@@ -1238,7 +1241,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected DispatchRate internalGetSubscriptionDispatchRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         DispatchRate dispatchRate = policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1251,9 +1254,10 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
-        log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
         validateSuperUserAccess();
 
+        log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
+
         Entry<Policies, Stat> policiesNode = null;
 
         try {
@@ -1288,7 +1292,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected SubscribeRate internalGetSubscribeRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
         Policies policies = getNamespacePolicies(namespaceName);
         SubscribeRate subscribeRate = policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
         if (subscribeRate != null) {
@@ -1300,8 +1304,8 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
-        log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
         validateSuperUserAccess();
+        log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
 
         Entry<Policies, Stat> policiesNode = null;
 
@@ -1337,7 +1341,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected DispatchRate internalGetReplicatorDispatchRate() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1350,7 +1354,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         if (backlogQuotaType == null) {
@@ -1397,7 +1401,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         if (backlogQuotaType == null) {
@@ -1430,7 +1434,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetRetention(RetentionPolicies retention) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -1468,7 +1472,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetPersistence(PersistencePolicies persistence) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
         validatePersistencePolicies(persistence);
 
@@ -1499,7 +1503,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected PersistencePolicies internalGetPersistence() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         if (policies.persistence == null) {
@@ -1511,7 +1515,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
 
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
@@ -1553,7 +1557,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
 
         Policies policies = getNamespacePolicies(namespaceName);
 
@@ -1574,7 +1578,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
             boolean authoritative) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
 
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
@@ -1617,7 +1621,7 @@ public abstract class NamespacesBase extends AdminResource {
     @SuppressWarnings("deprecation")
     protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
             boolean authoritative) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
 
         Policies policies = getNamespacePolicies(namespaceName);
 
@@ -1638,7 +1642,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
             boolean authoritative) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
 
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
@@ -1680,7 +1684,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     @SuppressWarnings("deprecation")
     protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
 
         Policies policies = getNamespacePolicies(namespaceName);
 
@@ -1700,7 +1704,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscriptionAuthMode) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         if (subscriptionAuthMode == null) {
@@ -1736,7 +1740,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         Entry<Policies, Stat> policiesNode = null;
@@ -1772,7 +1776,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         if (policies.delayed_delivery_policies == null) {
@@ -1785,7 +1789,6 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
         validateSuperUserAccess();
-        validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -1818,7 +1821,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);
@@ -1859,12 +1862,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected String internalGetNamespaceAntiAffinityGroup() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).antiAffinityGroup;
     }
 
     protected void internalRemoveNamespaceAntiAffinityGroup() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName);
@@ -1895,7 +1898,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
             String tenant) {
-        validateAdminAccessForTenant(tenant);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
 
         log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster);
 
@@ -1947,7 +1950,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected RetentionPolicies internalGetRetention() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         if (policies.retention_policies == null) {
@@ -2140,12 +2143,12 @@ public abstract class NamespacesBase extends AdminResource {
 
 
     protected int internalGetMaxProducersPerTopic() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_producers_per_topic;
     }
 
     protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -2181,12 +2184,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected int internalGetMaxConsumersPerTopic() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_consumers_per_topic;
     }
 
     protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -2222,12 +2225,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected int internalGetMaxConsumersPerSubscription() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
     }
 
     protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -2263,12 +2266,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected int internalGetMaxUnackedMessagesPerConsumer() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
     }
 
     protected void internalSetMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -2304,12 +2307,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected int internalGetMaxUnackedMessagesPerSubscription() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
     }
 
     protected void internalSetMaxUnackedMessagesPerSubscription(int maxUnackedMessagesPerSubscription) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -2345,12 +2348,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected long internalGetCompactionThreshold() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).compaction_threshold;
     }
 
     protected void internalSetCompactionThreshold(long newThreshold) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -2386,7 +2389,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected long internalGetOffloadThreshold() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
         Policies policies = getNamespacePolicies(namespaceName);
         if (policies.offload_policies == null) {
             return policies.offload_threshold;
@@ -2396,7 +2399,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetOffloadThreshold(long newThreshold) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -2433,7 +2436,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected Long internalGetOffloadDeletionLag() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
         Policies policies = getNamespacePolicies(namespaceName);
         if (policies.offload_policies == null) {
             return policies.offload_deletion_lag_ms;
@@ -2443,7 +2446,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
@@ -2481,12 +2484,12 @@ public abstract class NamespacesBase extends AdminResource {
 
     @Deprecated
     protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
     }
 
     protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
         Policies policies = getNamespacePolicies(namespaceName);
         SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
         if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED){
@@ -2498,7 +2501,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     @Deprecated
     protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
@@ -2509,7 +2512,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
@@ -2520,13 +2523,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected boolean internalGetSchemaValidationEnforced() {
-        validateSuperUserAccess();
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).schema_validation_enforced;
     }
 
     protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
@@ -2537,13 +2539,12 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected boolean internalGetIsAllowAutoUpdateSchema() {
-        validateSuperUserAccess();
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).is_allow_auto_update_schema;
     }
 
     protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) {
-        validateSuperUserAccess();
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
@@ -2586,7 +2587,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPolicies offloadPolicies) {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
         validateOffloadPolicies(offloadPolicies);
 
@@ -2663,7 +2664,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected OffloadPolicies internalGetOffloadPolicies() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         return policies.offload_policies;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 700381d..c9e6def 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -35,12 +36,16 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,7 +94,7 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Property or cluster doesn't exist") })
     public List<String> getNamespacesForCluster(@PathParam("property") String property,
             @PathParam("cluster") String cluster) {
-        validateAdminAccessForTenant(property);
+        validateTenantOperation(property, TenantOperation.LIST_NAMESPACES);
         List<String> namespaces = Lists.newArrayList();
         if (!clusters().contains(cluster)) {
             log.warn("[{}] Failed to get namespace list for property: {}/{} - Cluster does not exist", clientAppId(),
@@ -121,16 +126,14 @@ public class Namespaces extends NamespacesBase {
                                   @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
                                   @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
                                   @Suspended AsyncResponse asyncResponse) {
-        validateAdminAccessForTenant(property);
         validateNamespaceName(property, cluster, namespace);
+        validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS);
 
         // Validate that namespace exists, throws 404 if it doesn't exist
         getNamespacePolicies(namespaceName);
 
         pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
-                .thenAccept(topics -> {
-                    asyncResponse.resume(topics);
-                })
+                .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     log.error("Failed to get topics list for namespace {}", namespaceName, ex);
                     asyncResponse.resume(ex);
@@ -145,8 +148,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
     public Policies getPolicies(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(property);
         validateNamespaceName(property, cluster, namespace);
+        validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.ALL, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName);
     }
 
@@ -228,8 +231,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 409, message = "Namespace is not empty") })
     public Map<String, Set<AuthAction>> getPermissions(@PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(property);
         validateNamespaceName(property, cluster, namespace);
+        validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION);
 
         Policies policies = getNamespacePolicies(namespaceName);
         return policies.auth_policies.namespace_auth;
@@ -294,8 +297,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 412, message = "Namespace is not global") })
     public Set<String> getNamespaceReplicationClusters(@PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(property);
         validateNamespaceName(property, cluster, namespace);
+        validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.REPLICATION, PolicyOperation.READ);
 
         return internalGetNamespaceReplicationClusters();
     }
@@ -320,8 +323,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
     public int getNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(property);
         validateNamespaceName(property, cluster, namespace);
+        validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.TTL, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         return policies.message_ttl_in_seconds;
@@ -505,9 +508,9 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
     public BundlesData getBundlesData(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(property);
         validatePoliciesReadOnlyAccess();
         validateNamespaceName(property, cluster, namespace);
+        validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_BUNDLE);
 
         Policies policies = getNamespacePolicies(namespaceName);
 
@@ -638,8 +641,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Namespace does not exist") })
     public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(property);
         validateNamespaceName(property, cluster, namespace);
+        validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.BACKLOG, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         return policies.backlog_quota_map;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 95cf65b..4f123a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -53,9 +54,12 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -64,6 +68,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 
+import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,16 +96,14 @@ public class Namespaces extends NamespacesBase {
                                   @PathParam("namespace") String namespace,
                                   @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
                                   @Suspended AsyncResponse asyncResponse) {
-        validateAdminAccessForTenant(tenant);
         validateNamespaceName(tenant, namespace);
+        validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS);
 
         // Validate that namespace exists, throws 404 if it doesn't exist
         getNamespacePolicies(namespaceName);
 
         pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
-                .thenAccept(topics -> {
-                    asyncResponse.resume(topics);
-                })
+                .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     log.error("Failed to get topics list for namespace {}", namespaceName, ex);
                     asyncResponse.resume(ex);
@@ -114,8 +117,8 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
     public Policies getPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(tenant);
         validateNamespaceName(tenant, namespace);
+        validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.ALL, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName);
     }
 
@@ -129,7 +132,7 @@ public class Namespaces extends NamespacesBase {
     public void createNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             Policies policies) {
         validateNamespaceName(tenant, namespace);
-
+        validateTenantOperation(tenant, TenantOperation.CREATE_NAMESPACE);
         policies = getDefaultPolicesIfNull(policies);
         internalCreateNamespace(policies);
     }
@@ -178,8 +181,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 409, message = "Namespace is not empty") })
     public Map<String, Set<AuthAction>> getPermissions(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(tenant);
         validateNamespaceName(tenant, namespace);
+        validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_PERMISSION);
 
         Policies policies = getNamespacePolicies(namespaceName);
         return policies.auth_policies.namespace_auth;
@@ -244,9 +247,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 412, message = "Namespace is not global") })
     public Set<String> getNamespaceReplicationClusters(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(tenant);
         validateNamespaceName(tenant, namespace);
-
+        validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.REPLICATION, PolicyOperation.READ);
         return internalGetNamespaceReplicationClusters();
     }
 
@@ -270,9 +272,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
     public int getNamespaceMessageTTL(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
-
-        validateAdminAccessForTenant(tenant);
         validateNamespaceName(tenant, namespace);
+        validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.TTL, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
         return policies.message_ttl_in_seconds;
@@ -410,9 +411,9 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
     public BundlesData getBundlesData(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(tenant);
         validatePoliciesReadOnlyAccess();
         validateNamespaceName(tenant, namespace);
+        validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_BUNDLE);
 
         Policies policies = getNamespacePolicies(namespaceName);
 
@@ -584,9 +585,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Namespace does not exist") })
     public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
-        validateAdminAccessForTenant(tenant);
         validateNamespaceName(tenant, namespace);
-
+        validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.BACKLOG, PolicyOperation.READ);
         Policies policies = getNamespacePolicies(namespaceName);
         return policies.backlog_quota_map;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1eefc37..bc04f33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -59,6 +59,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -77,6 +78,7 @@ import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.protocol.CommandUtils;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -292,8 +294,8 @@ public class ServerCnx extends PulsarHandler {
             }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-                isProxyAuthorizedFuture = service.getAuthorizationService().canLookupAsync(topicName, authRole,
-                    authenticationData);
+                isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+                        TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
@@ -364,8 +366,8 @@ public class ServerCnx extends PulsarHandler {
             }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-                isProxyAuthorizedFuture = service.getAuthorizationService()
-                        .canLookupAsync(topicName, authRole, authenticationData);
+                isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+                        TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
@@ -760,8 +762,9 @@ public class ServerCnx extends PulsarHandler {
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            isProxyAuthorizedFuture = service.getAuthorizationService().canConsumeAsync(topicName, authRole,
-                    authenticationData, subscribe.getSubscription());
+            authenticationData.setSubscription(subscriptionName);
+            isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+                    TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
         } else {
             isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
         }
@@ -769,9 +772,13 @@ public class ServerCnx extends PulsarHandler {
             if (isProxyAuthorized) {
                 CompletableFuture<Boolean> authorizationFuture;
                 if (service.isAuthorizationEnabled()) {
-                    authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
-                            originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
-                            subscriptionName);
+                    if (authenticationData == null) {
+                        authenticationData = new AuthenticationDataCommand("", subscriptionName);
+                    } else {
+                        authenticationData.setSubscription(subscriptionName);
+                    }
+                    authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+                            TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
                 } else {
                     authorizationFuture = CompletableFuture.completedFuture(true);
                 }
@@ -979,8 +986,8 @@ public class ServerCnx extends PulsarHandler {
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            isProxyAuthorizedFuture = service.getAuthorizationService().canProduceAsync(topicName,
-                    authRole, authenticationData);
+            isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+                    TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
         } else {
             isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
         }
@@ -988,8 +995,8 @@ public class ServerCnx extends PulsarHandler {
             if (isProxyAuthorized) {
                 CompletableFuture<Boolean> authorizationFuture;
                 if (service.isAuthorizationEnabled()) {
-                    authorizationFuture = service.getAuthorizationService().canProduceAsync(topicName,
-                            originalPrincipal != null ? originalPrincipal : authRole, authenticationData);
+                    authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+                            TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
                 } else {
                     authorizationFuture = CompletableFuture.completedFuture(true);
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 2273d1d..96569e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -61,8 +61,13 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -771,4 +776,69 @@ public abstract class PulsarWebResource {
     // Non-Usual HTTP error codes
     protected static final int NOT_IMPLEMENTED = 501;
 
+    public void validateTenantOperation(String tenant, TenantOperation operation) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+            if (!isClientAuthenticated(clientAppId())) {
+                throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
+            }
+
+            Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+                    .allowTenantOperation(
+                            tenant, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+            if (!isAuthorized) {
+                throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTenantOperation for" +
+                                " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]",
+                        originalPrincipal(), clientAppId(), operation.toString(), tenant));
+            }
+        }
+    }
+
+    public void validateNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+            if (!isClientAuthenticated(clientAppId())) {
+                throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+            }
+
+            Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+                    .allowNamespaceOperation(namespaceName, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+            if (!isAuthorized) {
+                throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespaceOperation for" +
+                        " operation [%s] on namespace [%s]", operation.toString(), namespaceName));
+            }
+        }
+    }
+
+    public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+            if (!isClientAuthenticated(clientAppId())) {
+                throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+            }
+
+            Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+                    .allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+            if (!isAuthorized) {
+                throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespacePolicyOperation for" +
+                        " operation [%s] on namespace [%s] on policy [%s]", operation.toString(), namespaceName, policy.toString()));
+            }
+        }
+    }
+
+    public void validateTopicOperation(TopicName topicName, TopicOperation operation) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+            if (!isClientAuthenticated(clientAppId())) {
+                throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
+            }
+
+            Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+                    .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+            if (!isAuthorized) {
+                throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" +
+                        " operation [%s] on topic [%s]", operation.toString(), topicName));
+            }
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index f98758c..2df24bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -81,9 +81,12 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -154,9 +157,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(null).when(namespaces).originalPrincipal();
         doReturn(null).when(namespaces).clientAuthData();
         doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
-        doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant);
-        doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant");
-        doNothing().when(namespaces).validateAdminAccessForTenant("new-property");
 
         admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:8080"));
         admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:8080"));
@@ -171,7 +171,15 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 new BundlesData());
 
         doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
-                .validateAdminAccessForTenant(this.testOtherTenant);
+                .validateTenantOperation(this.testOtherTenant, null);
+
+        doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+                .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+                        PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+        doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+                .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+                        PolicyName.REPLICATION, PolicyOperation.WRITE);
 
         nsSvc = pulsar.getNamespaceService();
     }
@@ -878,7 +886,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testValidateAdminAccessOnTenant() throws Exception {
-
         try {
             final String property = "prop";
             pulsar.getConfiguration().setAuthenticationEnabled(true);
@@ -888,7 +895,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                     new TenantInfo(Sets.newHashSet(namespaces.clientAppId()), Sets.newHashSet("use")));
             ZkUtils.createFullPathOptimistic(pulsar.getConfigurationCache().getZooKeeper(), path, data.getBytes(),
                     ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            namespaces.validateAdminAccessForTenant(property);
+
+            namespaces.validateTenantOperation(property, null);
         } finally {
             pulsar.getConfiguration().setAuthenticationEnabled(false);
             pulsar.getConfiguration().setAuthorizationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index cb913fb..aa71801 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -478,8 +478,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testProducerCommandWithAuthorizationPositive() throws Exception {
         AuthorizationService authorizationService = mock(AuthorizationService.class);
-        doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canProduceAsync(Mockito.any(),
-                Mockito.any(), Mockito.any());
+        doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         resetChannel();
@@ -605,8 +605,8 @@ public class ServerCnxTest {
 
     public void testProducerCommandWithAuthorizationNegative() throws Exception {
         AuthorizationService authorizationService = mock(AuthorizationService.class);
-        doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canProduceAsync(Mockito.any(),
-                Mockito.any(), Mockito.any());
+        doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1195,8 +1195,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
         AuthorizationService authorizationService = mock(AuthorizationService.class);
-        doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canConsumeAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any());
+        doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1217,8 +1217,8 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
         AuthorizationService authorizationService = mock(AuthorizationService.class);
-        doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canConsumeAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any());
+        doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 7b324c0..ddeed96 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import javax.naming.AuthenticationException;
@@ -49,7 +50,11 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.RestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -487,6 +492,41 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
                 String subscriptionName, String role, String authDataJson) {
             return CompletableFuture.completedFuture(null);
         }
+
+        @Override
+        public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+            return true;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+            return true;
+        }
     }
 
     /**
@@ -515,21 +555,32 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     }
 
     public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
+        @Override
+        public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+            try {
+                return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
+            } catch (InterruptedException e) {
+                throw new RestException(e);
+            } catch (ExecutionException e) {
+                throw new RestException(e.getCause());
+            }
+        }
 
         @Override
-        public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
-                AuthenticationDataSource authenticationData, String subscription) {
+        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
             CompletableFuture<Boolean> future = new CompletableFuture<>();
-            if (isNotBlank(subscription)) {
-                if (!subscription.startsWith(role)) {
-                    future.completeExceptionally(new PulsarServerException(
-                            "The subscription name needs to be prefixed by the authentication role"));
+            if (authData.hasSubscription()) {
+                String subscription = authData.getSubscription();
+                if (isNotBlank(subscription)) {
+                    if (!subscription.startsWith(role)) {
+                        future.completeExceptionally(new PulsarServerException(
+                                "The subscription name needs to be prefixed by the authentication role"));
+                    }
                 }
             }
             future.complete(clientRole.equals(role));
             return future;
         }
-
     }
 
     public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
index cc3cb3b..ec9d4e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
@@ -61,6 +61,8 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
     protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
+        super.conf.setAuthorizationEnabled(true);
+        super.conf.setAuthenticationEnabled(true);
     }
 
     @AfterMethod
@@ -99,9 +101,9 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
          **/
 
         assertEquals(hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")),
-                "Tenant does not exist");
-        assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Tenant does not exist");
-        assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Tenant does not exist");
+                "Need to authenticate to perform the request");
+        assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Need to authenticate to perform the request");
+        assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Need to authenticate to perform the request");
 
         server.stop();
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
new file mode 100644
index 0000000..bda93c4
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Namespace authorization operations.
+ */
+public enum NamespaceOperation {
+    CREATE_TOPIC,
+    GET_TOPIC,
+    GET_TOPICS,
+    DELETE_TOPIC,
+
+    ADD_BUNDLE,
+    DELETE_BUNDLE,
+    GET_BUNDLE,
+
+    GET_PERMISSION,
+    GRANT_PERMISSION,
+    REVOKE_PERMISSION,
+
+    CLEAR_BACKLOG,
+    UNSUBSCRIBE,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
new file mode 100644
index 0000000..439ed7b
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * PolicyName authorization operations.
+ */
+public enum PolicyName {
+    ALL,
+    ANTI_AFFINITY,
+    BACKLOG,
+    COMPACTION,
+    DELAYED_DELIVERY,
+    DEDUPLICATION,
+    MAX_CONSUMERS,
+    MAX_PRODUCERS,
+    MAX_UNACKED,
+    OFFLOAD,
+    PERSISTENCE,
+    RATE,
+    RETENTION,
+    REPLICATION,
+    REPLICATION_RATE,
+    SCHEMA_COMPATIBILITY_STRATEGY,
+    SUBSCRIPTION_AUTH_MODE,
+    ENCRYPTION,
+    TTL,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java
new file mode 100644
index 0000000..ce70341
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * PolicyOperation authorization operations.
+ */
+public enum PolicyOperation {
+    READ,
+    WRITE,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java
new file mode 100644
index 0000000..b444433
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Tenant authorization operations.
+ */
+public enum TenantOperation {
+    CREATE_NAMESPACE,
+    DELETE_NAMESPACE,
+    LIST_NAMESPACES,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
new file mode 100644
index 0000000..7e54cca
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Topic authorization operations.
+ */
+public enum TopicOperation {
+    LOOKUP,
+    PRODUCE,
+    CONSUME,
+
+    COMPACT,
+    EXPIRE_MESSAGES,
+    OFFLOAD,
+    PEEK_MESSAGES,
+    RESET_CURSOR,
+    SKIP,
+    TERMINATE,
+    UNLOAD,
+
+    GRANT_PERMISSION,
+    GET_PERMISSION,
+    REVOKE_PERMISSION,
+
+    ADD_BUNDLE_RANGE,
+    GET_BUNDLE_RANGE,
+    DELETE_BUNDLE_RANGE,
+
+    SUBSCRIBE,
+    GET_SUBSCRIPTIONS,
+    UNSUBSCRIBE,
+}