You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/16 08:14:45 UTC

[pulsar] branch master updated: [improve][broker] Make some methods in TenantsBase async. (#15603)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fedf63a436 [improve][broker] Make some methods in TenantsBase async. (#15603)
5fedf63a436 is described below

commit 5fedf63a436792636e4f58b444b2e0dc3dab8746
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 16 16:14:37 2022 +0800

    [improve][broker] Make some methods in TenantsBase async. (#15603)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  15 -
 .../pulsar/broker/admin/impl/TenantsBase.java      | 345 ++++++++-------------
 .../pulsar/broker/web/PulsarWebResource.java       |  27 ++
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  12 +-
 4 files changed, 169 insertions(+), 230 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3a396411c63..7f77568b8bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -40,10 +40,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
@@ -741,19 +739,6 @@ public abstract class AdminResource extends PulsarWebResource {
         return future;
     }
 
-    protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
-        Throwable realCause = FutureUtil.unwrapCompletionException(exception);
-        if (realCause instanceof WebApplicationException) {
-            asyncResponse.resume(realCause);
-        } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
-            asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
-        } else if (realCause instanceof PulsarAdminException) {
-            asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
-        } else {
-            asyncResponse.resume(new RestException(realCause));
-        }
-    }
-
     protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
         return validateTopicPolicyOperationAsync(topicName,
                 PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index 4f1bad29736..045cafbfd53 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.pulsar.common.naming.Constants.GLOBAL_CLUSTER;
 import com.google.common.collect.Lists;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -45,8 +46,7 @@ import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.naming.Constants;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -64,23 +64,18 @@ public class TenantsBase extends PulsarWebResource {
             @ApiResponse(code = 404, message = "Tenant doesn't exist")})
     public void getTenants(@Suspended final AsyncResponse asyncResponse) {
         final String clientAppId = clientAppId();
-        try {
-            validateSuperUserAccess();
-        } catch (Exception e) {
-            asyncResponse.resume(e);
-            return;
-        }
-        tenantResources().listTenantsAsync().whenComplete((tenants, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to get tenants list", clientAppId, e);
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
-            // deep copy the tenants to avoid concurrent sort exception
-            List<String> deepCopy = new ArrayList<>(tenants);
-            deepCopy.sort(null);
-            asyncResponse.resume(deepCopy);
-        });
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> tenantResources().listTenantsAsync())
+                .thenAccept(tenants -> {
+                    // deep copy the tenants to avoid concurrent sort exception
+                    List<String> deepCopy = new ArrayList<>(tenants);
+                    deepCopy.sort(null);
+                    asyncResponse.resume(deepCopy);
+                }).exceptionally(ex -> {
+                    log.error("[{}] Failed to get tenants list", clientAppId, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -91,22 +86,20 @@ public class TenantsBase extends PulsarWebResource {
     public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant) {
         final String clientAppId = clientAppId();
-        try {
-            validateSuperUserAccess();
-        } catch (Exception e) {
-            asyncResponse.resume(e);
-        }
-
-        tenantResources().getTenantAsync(tenant).whenComplete((tenantInfo, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to get Tenant {}", clientAppId, e.getMessage());
-                asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get Tenant"));
-                return;
-            }
-            boolean response = tenantInfo.isPresent() ? asyncResponse.resume(tenantInfo.get())
-                    : asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant does not exist"));
-            return;
-        });
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> tenantResources().getTenantAsync(tenant))
+                .thenApply(tenantInfo -> {
+                    if (!tenantInfo.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
+                    }
+                    return tenantInfo.get();
+                })
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get tenant admin {}", clientAppId, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -120,58 +113,44 @@ public class TenantsBase extends PulsarWebResource {
     public void createTenant(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant,
             @ApiParam(value = "TenantInfo") TenantInfoImpl tenantInfo) {
-
         final String clientAppId = clientAppId();
         try {
-            validateSuperUserAccess();
-            validatePoliciesReadOnlyAccess();
-            validateClusters(tenantInfo);
             NamedEntity.checkName(tenant);
         } catch (IllegalArgumentException e) {
-            log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId(), tenant, e);
+            log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId, tenant, e);
             asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
             return;
-        } catch (Exception e) {
-            asyncResponse.resume(e);
-            return;
         }
-
-        tenantResources().listTenantsAsync().whenComplete((tenants, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to create tenant ", clientAppId, e.getCause());
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
-
-            int maxTenants = pulsar().getConfiguration().getMaxTenants();
-            // Due to the cost of distributed locks, no locks are added here.
-            // In a concurrent scenario, the threshold will be exceeded.
-            if (maxTenants > 0) {
-                if (tenants != null && tenants.size() >= maxTenants) {
-                    asyncResponse.resume(
-                            new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants"));
-                    return;
-                }
-            }
-            tenantResources().tenantExistsAsync(tenant).thenAccept(exist -> {
-                if (exist) {
-                    asyncResponse.resume(new RestException(Status.CONFLICT, "Tenant already exist"));
-                    return;
-                }
-                tenantResources().createTenantAsync(tenant, tenantInfo).thenAccept((r) -> {
-                    log.info("[{}] Created tenant {}", clientAppId(), tenant);
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> validateClustersAsync(tenantInfo))
+                .thenCompose(__ -> tenantResources().listTenantsAsync())
+                .thenAccept(tenants -> {
+                    int maxTenants = pulsar().getConfiguration().getMaxTenants();
+                    // Due to the cost of distributed locks, no locks are added here.
+                    // In a concurrent scenario, the threshold will be exceeded.
+                    if (maxTenants > 0) {
+                        if (tenants != null && tenants.size() >= maxTenants) {
+                            throw new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants");
+                        }
+                    }
+                })
+                .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
+                .thenAccept(exist -> {
+                    if (exist) {
+                        throw new RestException(Status.CONFLICT, "Tenant already exist");
+                    }
+                })
+                .thenCompose(__ -> tenantResources().createTenantAsync(tenant, tenantInfo))
+                .thenAccept(__ -> {
+                    log.info("[{}] Created tenant {}", clientAppId, tenant);
                     asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(ex -> {
+                })
+                .exceptionally(ex -> {
                     log.error("[{}] Failed to create tenant {}", clientAppId, tenant, ex);
-                    asyncResponse.resume(new RestException(ex));
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
                 });
-            }).exceptionally(ex -> {
-                log.error("[{}] Failed to create tenant {}", clientAppId(), tenant, ex);
-                asyncResponse.resume(new RestException(ex));
-                return null;
-            });
-        });
     }
 
     @POST
@@ -186,42 +165,28 @@ public class TenantsBase extends PulsarWebResource {
     public void updateTenant(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant,
             @ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) {
-        try {
-            validateSuperUserAccess();
-            validatePoliciesReadOnlyAccess();
-            validateClusters(newTenantAdmin);
-        } catch (Exception e) {
-            asyncResponse.resume(e);
-            return;
-        }
-
-        final String clientAddId = clientAppId();
-        tenantResources().getTenantAsync(tenant).thenAccept(tenantAdmin -> {
-            if (!tenantAdmin.isPresent()) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found"));
-                return;
-            }
-            TenantInfo oldTenantAdmin = tenantAdmin.get();
-            Set<String> newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters());
-            canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters).thenApply(r -> {
-                tenantResources().updateTenantAsync(tenant, old -> newTenantAdmin).thenAccept(done -> {
-                    log.info("Successfully updated tenant info {}", tenant);
+        final String clientAppId = clientAppId();
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> validateClustersAsync(newTenantAdmin))
+                .thenCompose(__ -> tenantResources().getTenantAsync(tenant))
+                .thenCompose(tenantAdmin -> {
+                    if (!tenantAdmin.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found");
+                    }
+                    TenantInfo oldTenantAdmin = tenantAdmin.get();
+                    Set<String> newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters());
+                    return canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters);
+                })
+                .thenCompose(__ -> tenantResources().updateTenantAsync(tenant, old -> newTenantAdmin))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully updated tenant info {}", clientAppId, tenant);
                     asyncResponse.resume(Response.noContent().build());
                 }).exceptionally(ex -> {
-                    log.warn("Failed to update tenant {}", tenant, ex.getCause());
-                    asyncResponse.resume(new RestException(ex));
+                    log.warn("[{}] Failed to update tenant {}", clientAppId, tenant, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
                 });
-                return null;
-            }).exceptionally(nsEx -> {
-                asyncResponse.resume(nsEx.getCause());
-                return null;
-            });
-        }).exceptionally(ex -> {
-            log.error("[{}] Failed to get tenant {}", clientAddId, tenant, ex.getCause());
-            asyncResponse.resume(new RestException(ex));
-            return null;
-        });
     }
 
     @DELETE
@@ -234,127 +199,89 @@ public class TenantsBase extends PulsarWebResource {
     public void deleteTenant(@Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") @ApiParam(value = "The tenant name") String tenant,
             @QueryParam("force") @DefaultValue("false") boolean force) {
-        try {
-            validateSuperUserAccess();
-            validatePoliciesReadOnlyAccess();
-        } catch (Exception e) {
-            asyncResponse.resume(e);
-            return;
-        }
-        internalDeleteTenant(asyncResponse, tenant, force);
+        final String clientAppId = clientAppId();
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> internalDeleteTenant(tenant, force))
+                .thenAccept(__ -> {
+                    log.info("[{}] Deleted tenant {}", clientAppId, tenant);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] Failed to delete tenant {}", clientAppId, tenant, cause);
+                    if (cause instanceof IllegalStateException) {
+                        asyncResponse.resume(new RestException(Status.CONFLICT, cause));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    }
+                    return null;
+                });
     }
 
-    protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant, boolean force) {
-        if (force) {
-            internalDeleteTenantForcefully(asyncResponse, tenant);
-        } else {
-            internalDeleteTenant(asyncResponse, tenant);
-        }
+    protected CompletableFuture<Void> internalDeleteTenant(String tenant, boolean force) {
+        return force ? internalDeleteTenantAsyncForcefully(tenant) : internalDeleteTenantAsync(tenant);
     }
 
-    protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant) {
-        tenantResources().tenantExistsAsync(tenant).thenApply(exists -> {
-            if (!exists) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant doesn't exist"));
-                return null;
-            }
-
-            return hasActiveNamespace(tenant)
-                    .thenCompose(ignore -> tenantResources().deleteTenantAsync(tenant))
-                    .thenCompose(ignore -> pulsar().getPulsarResources().getTopicResources()
-                            .clearTenantPersistence(tenant))
-                    .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources()
-                            .deleteTenantAsync(tenant))
-                    .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources()
+    protected CompletableFuture<Void> internalDeleteTenantAsync(String tenant) {
+        return tenantResources().tenantExistsAsync(tenant)
+                .thenAccept(exists -> {
+                    if (!exists) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant doesn't exist");
+                    }
+                })
+                .thenCompose(__ -> hasActiveNamespace(tenant))
+                .thenCompose(__ -> tenantResources().deleteTenantAsync(tenant))
+                .thenCompose(__ -> pulsar().getPulsarResources().getTopicResources().clearTenantPersistence(tenant))
+                .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources().deleteTenantAsync(tenant))
+                .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources()
                             .getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
-                    .thenCompose(ignore -> pulsar().getPulsarResources().getLocalPolicies()
+                .thenCompose(__ -> pulsar().getPulsarResources().getLocalPolicies()
                             .deleteLocalPoliciesTenantAsync(tenant))
-                    .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources()
-                            .deleteBundleDataTenantAsync(tenant))
-                    .whenComplete((ignore, ex) -> {
-                        if (ex != null) {
-                            log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, ex);
-                            if (ex.getCause() instanceof IllegalStateException) {
-                                asyncResponse.resume(new RestException(Status.CONFLICT, ex.getCause()));
-                            } else {
-                                asyncResponse.resume(new RestException(ex));
-                            }
-                        } else {
-                            log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
-                            asyncResponse.resume(Response.noContent().build());
-                        }
-                    });
-        });
+                .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources()
+                            .deleteBundleDataTenantAsync(tenant));
     }
 
-    protected void internalDeleteTenantForcefully(AsyncResponse asyncResponse, String tenant) {
+    protected CompletableFuture<Void> internalDeleteTenantAsyncForcefully(String tenant) {
         if (!pulsar().getConfiguration().isForceDeleteTenantAllowed()) {
-            asyncResponse.resume(
+            return FutureUtil.failedFuture(
                     new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of tenants"));
-            return;
-        }
-
-        List<String> namespaces;
-        try {
-            namespaces = tenantResources().getListOfNamespaces(tenant);
-        } catch (Exception e) {
-            log.error("[{}] Failed to get namespaces list of {}", clientAppId(), tenant, e);
-            asyncResponse.resume(new RestException(e));
-            return;
         }
-
-        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
-        try {
-            for (String namespace : namespaces) {
-                futures.add(pulsar().getAdminClient().namespaces().deleteNamespaceAsync(namespace, true));
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, e);
-            asyncResponse.resume(new RestException(e));
-        }
-
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            if (exception != null) {
-                if (exception.getCause() instanceof PulsarAdminException) {
-                    asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
-                } else {
-                    log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, exception);
-                    asyncResponse.resume(new RestException(exception.getCause()));
-                }
-                return null;
-            }
-
-            // delete tenant normally
-            internalDeleteTenant(asyncResponse, tenant);
-
-            asyncResponse.resume(Response.noContent().build());
-            return null;
-        });
+        return tenantResources().getListOfNamespacesAsync(tenant)
+                .thenApply(namespaces -> {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    try {
+                        PulsarAdmin adminClient = pulsar().getAdminClient();
+                        for (String namespace : namespaces) {
+                            futures.add(adminClient.namespaces().deleteNamespaceAsync(namespace, true));
+                        }
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, e);
+                        throw new RestException(e);
+                    }
+                    return futures;
+                })
+                .thenCompose(futures -> FutureUtil.waitForAll(futures))
+                .thenCompose(__ -> internalDeleteTenantAsync(tenant));
     }
 
-    private void validateClusters(TenantInfo info) {
+    private CompletableFuture<Void> validateClustersAsync(TenantInfo info) {
         // empty cluster shouldn't be allowed
         if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c))
                 .collect(Collectors.toSet()).isEmpty()
                 || info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) {
             log.warn("[{}] Failed to validate due to clusters are empty", clientAppId());
-            throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty");
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty"));
         }
-
-        List<String> nonexistentClusters;
-        try {
-            Set<String> availableClusters = clusterResources().list();
+        return clusterResources().listAsync().thenAccept(availableClusters -> {
             Set<String> allowedClusters = info.getAllowedClusters();
-            nonexistentClusters = allowedClusters.stream().filter(
-                    cluster -> !(availableClusters.contains(cluster) || Constants.GLOBAL_CLUSTER.equals(cluster)))
+            List<String> nonexistentClusters = allowedClusters.stream()
+                    .filter(cluster -> !(availableClusters.contains(cluster) || GLOBAL_CLUSTER.equals(cluster)))
                     .collect(Collectors.toList());
-        } catch (Exception e) {
-            log.error("[{}] Failed to get available clusters", clientAppId(), e);
-            throw new RestException(e);
-        }
-        if (nonexistentClusters.size() > 0) {
-            log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters);
-            throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist");
-        }
+            if (nonexistentClusters.size() > 0) {
+                log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters);
+                throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist");
+            }
+        });
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 38c61d3688d..e491f78c9e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -40,6 +40,7 @@ import java.util.function.Supplier;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -64,6 +65,8 @@ import org.apache.pulsar.broker.resources.ResourceGroupResources;
 import org.apache.pulsar.broker.resources.TenantResources;
 import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.common.naming.Constants;
@@ -1042,6 +1045,17 @@ public abstract class PulsarWebResource {
         }
     }
 
+    public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {
+        return namespaceResources().getPoliciesReadOnlyAsync().thenAccept(readOnly -> {
+            if (readOnly) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Policies are read-only. Broker cannot do read-write operations");
+                }
+                throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations");
+            }
+        });
+    }
+
     protected CompletableFuture<Void> hasActiveNamespace(String tenant) {
         return tenantResources().hasActiveNamespace(tenant);
     }
@@ -1183,4 +1197,17 @@ public abstract class PulsarWebResource {
             throw new RestException(ex);
         }
     }
+
+    protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(exception);
+        if (realCause instanceof WebApplicationException) {
+            asyncResponse.resume(realCause);
+        } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
+            asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
+        } else if (realCause instanceof PulsarAdminException) {
+            asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
+        } else {
+            asyncResponse.resume(new RestException(realCause));
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 583a87275bd..23c50f3879e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -411,7 +411,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
     public void properties() throws Throwable {
         Object response = asyncRequests(ctx -> properties.getTenants(ctx));
         assertEquals(response, Lists.newArrayList());
-        verify(properties, times(1)).validateSuperUserAccess();
+        verify(properties, times(1)).validateSuperUserAccessAsync();
 
         // create local cluster
         asyncRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build()));
@@ -423,22 +423,22 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 .allowedClusters(allowedClusters)
                 .build();
         response = asyncRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo));
-        verify(properties, times(2)).validateSuperUserAccess();
+        verify(properties, times(2)).validateSuperUserAccessAsync();
 
         response = asyncRequests(ctx -> properties.getTenants(ctx));
         assertEquals(response, Lists.newArrayList("test-property"));
-        verify(properties, times(3)).validateSuperUserAccess();
+        verify(properties, times(3)).validateSuperUserAccessAsync();
 
         response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
         assertEquals(response, tenantInfo);
-        verify(properties, times(4)).validateSuperUserAccess();
+        verify(properties, times(4)).validateSuperUserAccessAsync();
 
         final TenantInfoImpl newPropertyAdmin = TenantInfoImpl.builder()
                 .adminRoles(Sets.newHashSet("role1", "other-role"))
                 .allowedClusters(allowedClusters)
                 .build();
         response = asyncRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin));
-        verify(properties, times(5)).validateSuperUserAccess();
+        verify(properties, times(5)).validateSuperUserAccessAsync();
 
         // Wait for updateTenant to take effect
         Thread.sleep(100);
@@ -447,7 +447,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         assertEquals(response, newPropertyAdmin);
         response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
         assertNotSame(response, tenantInfo);
-        verify(properties, times(7)).validateSuperUserAccess();
+        verify(properties, times(7)).validateSuperUserAccessAsync();
 
         // Check creating existing property
         try {