You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 13:24:55 UTC
[pulsar] 02/03: [Broker]make revokePermissionsOnTopic method async (#14149)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a7f3efa43abe414afd7f986d77ecb20aa47b279b
Author: Dezhi LIiu <33...@users.noreply.github.com>
AuthorDate: Wed Apr 20 11:24:14 2022 +0800
[Broker]make revokePermissionsOnTopic method async (#14149)
(cherry picked from commit d7ddda811437096b857bffff7d080a1c555f54d8)
---
.../broker/admin/impl/PersistentTopicsBase.java | 89 ++++++------
.../pulsar/broker/admin/v1/PersistentTopics.java | 17 ++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 11 +-
.../pulsar/broker/web/PulsarWebResource.java | 154 +++++++++++++--------
.../org/apache/pulsar/broker/admin/AdminTest.java | 8 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 13 +-
6 files changed, 185 insertions(+), 107 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7bdefb29098..baab14e88e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
@@ -320,49 +321,54 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- private void revokePermissions(String topicUri, String role) {
- Policies policies;
- try {
- policies = namespaceResources().getPolicies(namespaceName)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
- } catch (Exception e) {
- log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
- throw new RestException(e);
- }
- if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
- || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
- log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
- role, topicUri);
- throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
- }
- try {
- // Write the new policies to metadata store
- namespaceResources().setPolicies(namespaceName, p -> {
- p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
- return p;
- });
- log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
- } catch (Exception e) {
- log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
- throw new RestException(e);
- }
-
+ private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
+ return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
+ policiesOptional -> {
+ Policies policies = policiesOptional.orElseThrow(() ->
+ new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
+ || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
+ log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
+ clientAppId(), role, topicUri);
+ return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+ "Permissions are not set at the topic level"));
+ }
+ // Write the new policies to metadata store
+ return namespaceResources().setPoliciesAsync(namespaceName, p -> {
+ p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+ return p;
+ }).thenAccept(__ ->
+ log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
+ topicUri)
+ );
+ }
+ );
}
- protected void internalRevokePermissionsOnTopic(String role) {
+ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
- validateAdminAccessForTenant(namespaceName.getTenant());
- validatePoliciesReadOnlyAccess();
-
- PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
- int numPartitions = meta.partitions;
- if (numPartitions > 0) {
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- revokePermissions(topicNamePartition.toString(), role);
- }
- }
- revokePermissions(topicName.toString(), role);
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+ getPartitionedTopicMetadataAsync(topicName, true, false)
+ .thenCompose(metadata -> {
+ int numPartitions = metadata.partitions;
+ CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+ if (numPartitions > 0) {
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ future = future.thenComposeAsync(unused ->
+ revokePermissionsAsync(topicNamePartition.toString(), role));
+ }
+ }
+ return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
+ .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
+ }))
+ ).exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicName, realCause);
+ resumeAsyncResponseExceptionally(asyncResponse, realCause);
+ return null;
+ });
}
protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
@@ -3999,7 +4005,8 @@ public class PersistentTopicsBase extends AdminResource {
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar,
- clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
+ clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
+ pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (RestException authException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 283ba0ebccd..657f73824aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -141,11 +141,18 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 412, message = "Permissions are not set at the topic level")})
- public void revokePermissionsOnTopic(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalRevokePermissionsOnTopic(role);
+ public void revokePermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+ @PathParam("role") String role) {
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalRevokePermissionsOnTopic(asyncResponse, role);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c0f69f3270c..3f1350105ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -192,6 +192,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Permissions are not set at the topic level"),
@ApiResponse(code = 500, message = "Internal server error")})
public void revokePermissionsOnTopic(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -200,8 +201,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Client role to which grant permissions", required = true)
@PathParam("role") String role) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalRevokePermissionsOnTopic(role);
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRevokePermissionsOnTopic(asyncResponse, role);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d70934b08c7..5f7fb8fad41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@@ -258,7 +259,8 @@ public abstract class PulsarWebResource {
*/
protected void validateAdminAccessForTenant(String tenant) {
try {
- validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData());
+ validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData(),
+ config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (RestException e) {
throw e;
} catch (Exception e) {
@@ -268,65 +270,109 @@ public abstract class PulsarWebResource {
}
protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
- String originalPrincipal, String tenant,
- AuthenticationDataSource authenticationData)
- throws Exception {
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData,
+ long timeout, TimeUnit unit) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId, originalPrincipal, tenant, authenticationData)
+ .get(timeout, unit);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(e);
+ if (realCause instanceof WebApplicationException) {
+ throw (WebApplicationException) realCause;
+ } else {
+ throw new RestException(realCause);
+ }
+ }
+ }
+
+ /**
+ * Checks that the http client role has admin access to the specified tenant async.
+ *
+ * @param tenant the tenant id
+ */
+ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String tenant) {
+ return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), originalPrincipal(), tenant,
+ clientAuthData());
+ }
+
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId,
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
if (log.isDebugEnabled()) {
log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
(isClientAuthenticated(clientAppId)), clientAppId);
}
-
- TenantInfo tenantInfo = pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant does not exist"));
-
- if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
- if (!isClientAuthenticated(clientAppId)) {
- throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
- }
-
- validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, originalPrincipal);
-
- if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
- CompletableFuture<Boolean> isProxySuperUserFuture;
- CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
- try {
- AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
- isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId, authenticationData);
-
- isOriginalPrincipalSuperUserFuture =
- authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
- boolean proxyAuthorized = isProxySuperUserFuture.get()
- || authorizationService.isTenantAdmin(tenant, clientAppId,
- tenantInfo, authenticationData).get();
- boolean originalPrincipalAuthorized =
- isOriginalPrincipalSuperUserFuture.get() || authorizationService.isTenantAdmin(tenant,
- originalPrincipal, tenantInfo, authenticationData).get();
- if (!proxyAuthorized || !originalPrincipalAuthorized) {
- throw new RestException(Status.UNAUTHORIZED,
- String.format("Proxy not authorized to access resource (proxy:%s,original:%s)",
- clientAppId, originalPrincipal));
+ return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenCompose(tenantInfoOptional -> {
+ if (!tenantInfoOptional.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
}
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
- log.debug("Successfully authorized {} (proxied by {}) on tenant {}",
- originalPrincipal, clientAppId, tenant);
- } else {
- if (!pulsar.getBrokerService()
- .getAuthorizationService()
- .isSuperUser(clientAppId, authenticationData)
- .join()) {
- if (!pulsar.getBrokerService().getAuthorizationService()
- .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) {
- throw new RestException(Status.UNAUTHORIZED,
- "Don't have permission to administrate resources on this tenant");
+ TenantInfo tenantInfo = tenantInfoOptional.get();
+ if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration()
+ .isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId)) {
+ throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+ }
+ validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
+ originalPrincipal);
+ if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ AuthorizationService authorizationService =
+ pulsar.getBrokerService().getAuthorizationService();
+ return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+ authenticationData)
+ .thenCompose(isTenantAdmin -> {
+ String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}";
+ if (!isTenantAdmin) {
+ return authorizationService.isSuperUser(clientAppId, authenticationData)
+ .thenCombine(authorizationService.isSuperUser(originalPrincipal,
+ authenticationData),
+ (proxyAuthorized, originalPrincipalAuthorized) -> {
+ if (!proxyAuthorized || !originalPrincipalAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Proxy not authorized to access "
+ + "resource (proxy:%s,original:%s)"
+ , clientAppId, originalPrincipal));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(debugMsg, originalPrincipal, clientAppId,
+ tenant);
+ }
+ return null;
+ }
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(debugMsg, originalPrincipal, clientAppId, tenant);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ } else {
+ return pulsar.getBrokerService()
+ .getAuthorizationService()
+ .isSuperUser(clientAppId, authenticationData)
+ .thenCompose(isSuperUser -> {
+ if (!isSuperUser) {
+ return pulsar.getBrokerService().getAuthorizationService()
+ .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData);
+ } else {
+ return CompletableFuture.completedFuture(true);
+ }
+ }).thenAccept(authorized -> {
+ if (!authorized) {
+ throw new RestException(Status.UNAUTHORIZED,
+ "Don't have permission to administrate resources on this tenant");
+ } else {
+ log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
+ }
+ });
+ }
+ } else {
+ return CompletableFuture.completedFuture(null);
}
- }
-
- log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
- }
- }
+ });
}
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 0a8eceddf10..bd70861cd84 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -819,14 +819,16 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
namespace, topic);
assertEquals(permission.get(role), actions);
// remove permission
- persistentTopics.revokePermissionsOnTopic(property, cluster, namespace, topic, role);
-
+ response = mock(AsyncResponse.class);
+ persistentTopics.revokePermissionsOnTopic(response, property, cluster, namespace, topic, role);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// verify removed permission
Awaitility.await().untilAsserted(() -> {
Map<String, Set<AuthAction>> p = persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
assertTrue(p.isEmpty());
});
-
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index b8837e5d686..4ccfc08e9b5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -683,7 +683,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
- persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, topicName, role);
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, topicName, role);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
Assert.assertEquals(permissions.get(role), null);
}
@@ -702,7 +706,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
- persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role);
+ response = mock(AsyncResponse.class);
+ persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
partitionedTopicName);
Assert.assertEquals(permissions.get(role), null);