You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/16 08:14:45 UTC
[pulsar] branch master updated: [improve][broker] Make some methods in TenantsBase async. (#15603)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5fedf63a436 [improve][broker] Make some methods in TenantsBase async. (#15603)
5fedf63a436 is described below
commit 5fedf63a436792636e4f58b444b2e0dc3dab8746
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 16 16:14:37 2022 +0800
[improve][broker] Make some methods in TenantsBase async. (#15603)
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 -
.../pulsar/broker/admin/impl/TenantsBase.java | 345 ++++++++-------------
.../pulsar/broker/web/PulsarWebResource.java | 27 ++
.../org/apache/pulsar/broker/admin/AdminTest.java | 12 +-
4 files changed, 169 insertions(+), 230 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3a396411c63..7f77568b8bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -40,10 +40,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
@@ -741,19 +739,6 @@ public abstract class AdminResource extends PulsarWebResource {
return future;
}
- protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
- Throwable realCause = FutureUtil.unwrapCompletionException(exception);
- if (realCause instanceof WebApplicationException) {
- asyncResponse.resume(realCause);
- } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
- asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
- } else if (realCause instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
- } else {
- asyncResponse.resume(new RestException(realCause));
- }
- }
-
protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index 4f1bad29736..045cafbfd53 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin.impl;
+import static org.apache.pulsar.common.naming.Constants.GLOBAL_CLUSTER;
import com.google.common.collect.Lists;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -45,8 +46,7 @@ import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.naming.Constants;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -64,23 +64,18 @@ public class TenantsBase extends PulsarWebResource {
@ApiResponse(code = 404, message = "Tenant doesn't exist")})
public void getTenants(@Suspended final AsyncResponse asyncResponse) {
final String clientAppId = clientAppId();
- try {
- validateSuperUserAccess();
- } catch (Exception e) {
- asyncResponse.resume(e);
- return;
- }
- tenantResources().listTenantsAsync().whenComplete((tenants, e) -> {
- if (e != null) {
- log.error("[{}] Failed to get tenants list", clientAppId, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- // deep copy the tenants to avoid concurrent sort exception
- List<String> deepCopy = new ArrayList<>(tenants);
- deepCopy.sort(null);
- asyncResponse.resume(deepCopy);
- });
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> tenantResources().listTenantsAsync())
+ .thenAccept(tenants -> {
+ // deep copy the tenants to avoid concurrent sort exception
+ List<String> deepCopy = new ArrayList<>(tenants);
+ deepCopy.sort(null);
+ asyncResponse.resume(deepCopy);
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get tenants list", clientAppId, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -91,22 +86,20 @@ public class TenantsBase extends PulsarWebResource {
public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "The tenant name") @PathParam("tenant") String tenant) {
final String clientAppId = clientAppId();
- try {
- validateSuperUserAccess();
- } catch (Exception e) {
- asyncResponse.resume(e);
- }
-
- tenantResources().getTenantAsync(tenant).whenComplete((tenantInfo, e) -> {
- if (e != null) {
- log.error("[{}] Failed to get Tenant {}", clientAppId, e.getMessage());
- asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get Tenant"));
- return;
- }
- boolean response = tenantInfo.isPresent() ? asyncResponse.resume(tenantInfo.get())
- : asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant does not exist"));
- return;
- });
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> tenantResources().getTenantAsync(tenant))
+ .thenApply(tenantInfo -> {
+ if (!tenantInfo.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
+ }
+ return tenantInfo.get();
+ })
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get tenant admin {}", clientAppId, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -120,58 +113,44 @@ public class TenantsBase extends PulsarWebResource {
public void createTenant(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "The tenant name") @PathParam("tenant") String tenant,
@ApiParam(value = "TenantInfo") TenantInfoImpl tenantInfo) {
-
final String clientAppId = clientAppId();
try {
- validateSuperUserAccess();
- validatePoliciesReadOnlyAccess();
- validateClusters(tenantInfo);
NamedEntity.checkName(tenant);
} catch (IllegalArgumentException e) {
- log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId(), tenant, e);
+ log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId, tenant, e);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
return;
- } catch (Exception e) {
- asyncResponse.resume(e);
- return;
}
-
- tenantResources().listTenantsAsync().whenComplete((tenants, e) -> {
- if (e != null) {
- log.error("[{}] Failed to create tenant ", clientAppId, e.getCause());
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- int maxTenants = pulsar().getConfiguration().getMaxTenants();
- // Due to the cost of distributed locks, no locks are added here.
- // In a concurrent scenario, the threshold will be exceeded.
- if (maxTenants > 0) {
- if (tenants != null && tenants.size() >= maxTenants) {
- asyncResponse.resume(
- new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants"));
- return;
- }
- }
- tenantResources().tenantExistsAsync(tenant).thenAccept(exist -> {
- if (exist) {
- asyncResponse.resume(new RestException(Status.CONFLICT, "Tenant already exist"));
- return;
- }
- tenantResources().createTenantAsync(tenant, tenantInfo).thenAccept((r) -> {
- log.info("[{}] Created tenant {}", clientAppId(), tenant);
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> validateClustersAsync(tenantInfo))
+ .thenCompose(__ -> tenantResources().listTenantsAsync())
+ .thenAccept(tenants -> {
+ int maxTenants = pulsar().getConfiguration().getMaxTenants();
+ // Due to the cost of distributed locks, no locks are added here.
+ // In a concurrent scenario, the threshold will be exceeded.
+ if (maxTenants > 0) {
+ if (tenants != null && tenants.size() >= maxTenants) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants");
+ }
+ }
+ })
+ .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
+ .thenAccept(exist -> {
+ if (exist) {
+ throw new RestException(Status.CONFLICT, "Tenant already exist");
+ }
+ })
+ .thenCompose(__ -> tenantResources().createTenantAsync(tenant, tenantInfo))
+ .thenAccept(__ -> {
+ log.info("[{}] Created tenant {}", clientAppId, tenant);
asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
+ })
+ .exceptionally(ex -> {
log.error("[{}] Failed to create tenant {}", clientAppId, tenant, ex);
- asyncResponse.resume(new RestException(ex));
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
- }).exceptionally(ex -> {
- log.error("[{}] Failed to create tenant {}", clientAppId(), tenant, ex);
- asyncResponse.resume(new RestException(ex));
- return null;
- });
- });
}
@POST
@@ -186,42 +165,28 @@ public class TenantsBase extends PulsarWebResource {
public void updateTenant(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "The tenant name") @PathParam("tenant") String tenant,
@ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) {
- try {
- validateSuperUserAccess();
- validatePoliciesReadOnlyAccess();
- validateClusters(newTenantAdmin);
- } catch (Exception e) {
- asyncResponse.resume(e);
- return;
- }
-
- final String clientAddId = clientAppId();
- tenantResources().getTenantAsync(tenant).thenAccept(tenantAdmin -> {
- if (!tenantAdmin.isPresent()) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found"));
- return;
- }
- TenantInfo oldTenantAdmin = tenantAdmin.get();
- Set<String> newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters());
- canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters).thenApply(r -> {
- tenantResources().updateTenantAsync(tenant, old -> newTenantAdmin).thenAccept(done -> {
- log.info("Successfully updated tenant info {}", tenant);
+ final String clientAppId = clientAppId();
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> validateClustersAsync(newTenantAdmin))
+ .thenCompose(__ -> tenantResources().getTenantAsync(tenant))
+ .thenCompose(tenantAdmin -> {
+ if (!tenantAdmin.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found");
+ }
+ TenantInfo oldTenantAdmin = tenantAdmin.get();
+ Set<String> newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters());
+ return canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters);
+ })
+ .thenCompose(__ -> tenantResources().updateTenantAsync(tenant, old -> newTenantAdmin))
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully updated tenant info {}", clientAppId, tenant);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
- log.warn("Failed to update tenant {}", tenant, ex.getCause());
- asyncResponse.resume(new RestException(ex));
+ log.warn("[{}] Failed to update tenant {}", clientAppId, tenant, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
- return null;
- }).exceptionally(nsEx -> {
- asyncResponse.resume(nsEx.getCause());
- return null;
- });
- }).exceptionally(ex -> {
- log.error("[{}] Failed to get tenant {}", clientAddId, tenant, ex.getCause());
- asyncResponse.resume(new RestException(ex));
- return null;
- });
}
@DELETE
@@ -234,127 +199,89 @@ public class TenantsBase extends PulsarWebResource {
public void deleteTenant(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") @ApiParam(value = "The tenant name") String tenant,
@QueryParam("force") @DefaultValue("false") boolean force) {
- try {
- validateSuperUserAccess();
- validatePoliciesReadOnlyAccess();
- } catch (Exception e) {
- asyncResponse.resume(e);
- return;
- }
- internalDeleteTenant(asyncResponse, tenant, force);
+ final String clientAppId = clientAppId();
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> internalDeleteTenant(tenant, force))
+ .thenAccept(__ -> {
+ log.info("[{}] Deleted tenant {}", clientAppId, tenant);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to delete tenant {}", clientAppId, tenant, cause);
+ if (cause instanceof IllegalStateException) {
+ asyncResponse.resume(new RestException(Status.CONFLICT, cause));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ }
+ return null;
+ });
}
- protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant, boolean force) {
- if (force) {
- internalDeleteTenantForcefully(asyncResponse, tenant);
- } else {
- internalDeleteTenant(asyncResponse, tenant);
- }
+ protected CompletableFuture<Void> internalDeleteTenant(String tenant, boolean force) {
+ return force ? internalDeleteTenantAsyncForcefully(tenant) : internalDeleteTenantAsync(tenant);
}
- protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant) {
- tenantResources().tenantExistsAsync(tenant).thenApply(exists -> {
- if (!exists) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant doesn't exist"));
- return null;
- }
-
- return hasActiveNamespace(tenant)
- .thenCompose(ignore -> tenantResources().deleteTenantAsync(tenant))
- .thenCompose(ignore -> pulsar().getPulsarResources().getTopicResources()
- .clearTenantPersistence(tenant))
- .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources()
- .deleteTenantAsync(tenant))
- .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources()
+ protected CompletableFuture<Void> internalDeleteTenantAsync(String tenant) {
+ return tenantResources().tenantExistsAsync(tenant)
+ .thenAccept(exists -> {
+ if (!exists) {
+ throw new RestException(Status.NOT_FOUND, "Tenant doesn't exist");
+ }
+ })
+ .thenCompose(__ -> hasActiveNamespace(tenant))
+ .thenCompose(__ -> tenantResources().deleteTenantAsync(tenant))
+ .thenCompose(__ -> pulsar().getPulsarResources().getTopicResources().clearTenantPersistence(tenant))
+ .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources().deleteTenantAsync(tenant))
+ .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
- .thenCompose(ignore -> pulsar().getPulsarResources().getLocalPolicies()
+ .thenCompose(__ -> pulsar().getPulsarResources().getLocalPolicies()
.deleteLocalPoliciesTenantAsync(tenant))
- .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources()
- .deleteBundleDataTenantAsync(tenant))
- .whenComplete((ignore, ex) -> {
- if (ex != null) {
- log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, ex);
- if (ex.getCause() instanceof IllegalStateException) {
- asyncResponse.resume(new RestException(Status.CONFLICT, ex.getCause()));
- } else {
- asyncResponse.resume(new RestException(ex));
- }
- } else {
- log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
- asyncResponse.resume(Response.noContent().build());
- }
- });
- });
+ .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources()
+ .deleteBundleDataTenantAsync(tenant));
}
- protected void internalDeleteTenantForcefully(AsyncResponse asyncResponse, String tenant) {
+ protected CompletableFuture<Void> internalDeleteTenantAsyncForcefully(String tenant) {
if (!pulsar().getConfiguration().isForceDeleteTenantAllowed()) {
- asyncResponse.resume(
+ return FutureUtil.failedFuture(
new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of tenants"));
- return;
- }
-
- List<String> namespaces;
- try {
- namespaces = tenantResources().getListOfNamespaces(tenant);
- } catch (Exception e) {
- log.error("[{}] Failed to get namespaces list of {}", clientAppId(), tenant, e);
- asyncResponse.resume(new RestException(e));
- return;
}
-
- final List<CompletableFuture<Void>> futures = Lists.newArrayList();
- try {
- for (String namespace : namespaces) {
- futures.add(pulsar().getAdminClient().namespaces().deleteNamespaceAsync(namespace, true));
- }
- } catch (Exception e) {
- log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, e);
- asyncResponse.resume(new RestException(e));
- }
-
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
- } else {
- log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, exception);
- asyncResponse.resume(new RestException(exception.getCause()));
- }
- return null;
- }
-
- // delete tenant normally
- internalDeleteTenant(asyncResponse, tenant);
-
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
+ return tenantResources().getListOfNamespacesAsync(tenant)
+ .thenApply(namespaces -> {
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ try {
+ PulsarAdmin adminClient = pulsar().getAdminClient();
+ for (String namespace : namespaces) {
+ futures.add(adminClient.namespaces().deleteNamespaceAsync(namespace, true));
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, e);
+ throw new RestException(e);
+ }
+ return futures;
+ })
+ .thenCompose(futures -> FutureUtil.waitForAll(futures))
+ .thenCompose(__ -> internalDeleteTenantAsync(tenant));
}
- private void validateClusters(TenantInfo info) {
+ private CompletableFuture<Void> validateClustersAsync(TenantInfo info) {
// empty cluster shouldn't be allowed
if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c))
.collect(Collectors.toSet()).isEmpty()
|| info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) {
log.warn("[{}] Failed to validate due to clusters are empty", clientAppId());
- throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty");
+ return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty"));
}
-
- List<String> nonexistentClusters;
- try {
- Set<String> availableClusters = clusterResources().list();
+ return clusterResources().listAsync().thenAccept(availableClusters -> {
Set<String> allowedClusters = info.getAllowedClusters();
- nonexistentClusters = allowedClusters.stream().filter(
- cluster -> !(availableClusters.contains(cluster) || Constants.GLOBAL_CLUSTER.equals(cluster)))
+ List<String> nonexistentClusters = allowedClusters.stream()
+ .filter(cluster -> !(availableClusters.contains(cluster) || GLOBAL_CLUSTER.equals(cluster)))
.collect(Collectors.toList());
- } catch (Exception e) {
- log.error("[{}] Failed to get available clusters", clientAppId(), e);
- throw new RestException(e);
- }
- if (nonexistentClusters.size() > 0) {
- log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters);
- throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist");
- }
+ if (nonexistentClusters.size() > 0) {
+ log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters);
+ throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist");
+ }
+ });
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 38c61d3688d..e491f78c9e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -40,6 +40,7 @@ import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@@ -64,6 +65,8 @@ import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.common.naming.Constants;
@@ -1042,6 +1045,17 @@ public abstract class PulsarWebResource {
}
}
+ public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {
+ return namespaceResources().getPoliciesReadOnlyAsync().thenAccept(readOnly -> {
+ if (readOnly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot do read-write operations");
+ }
+ throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations");
+ }
+ });
+ }
+
protected CompletableFuture<Void> hasActiveNamespace(String tenant) {
return tenantResources().hasActiveNamespace(tenant);
}
@@ -1183,4 +1197,17 @@ public abstract class PulsarWebResource {
throw new RestException(ex);
}
}
+
+ protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(exception);
+ if (realCause instanceof WebApplicationException) {
+ asyncResponse.resume(realCause);
+ } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
+ asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
+ } else if (realCause instanceof PulsarAdminException) {
+ asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
+ } else {
+ asyncResponse.resume(new RestException(realCause));
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 583a87275bd..23c50f3879e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -411,7 +411,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
public void properties() throws Throwable {
Object response = asyncRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList());
- verify(properties, times(1)).validateSuperUserAccess();
+ verify(properties, times(1)).validateSuperUserAccessAsync();
// create local cluster
asyncRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build()));
@@ -423,22 +423,22 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.allowedClusters(allowedClusters)
.build();
response = asyncRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo));
- verify(properties, times(2)).validateSuperUserAccess();
+ verify(properties, times(2)).validateSuperUserAccessAsync();
response = asyncRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList("test-property"));
- verify(properties, times(3)).validateSuperUserAccess();
+ verify(properties, times(3)).validateSuperUserAccessAsync();
response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
assertEquals(response, tenantInfo);
- verify(properties, times(4)).validateSuperUserAccess();
+ verify(properties, times(4)).validateSuperUserAccessAsync();
final TenantInfoImpl newPropertyAdmin = TenantInfoImpl.builder()
.adminRoles(Sets.newHashSet("role1", "other-role"))
.allowedClusters(allowedClusters)
.build();
response = asyncRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin));
- verify(properties, times(5)).validateSuperUserAccess();
+ verify(properties, times(5)).validateSuperUserAccessAsync();
// Wait for updateTenant to take effect
Thread.sleep(100);
@@ -447,7 +447,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
assertEquals(response, newPropertyAdmin);
response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
assertNotSame(response, tenantInfo);
- verify(properties, times(7)).validateSuperUserAccess();
+ verify(properties, times(7)).validateSuperUserAccessAsync();
// Check creating existing property
try {