You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 13:24:55 UTC

[pulsar] 02/03: [Broker]make revokePermissionsOnTopic method async (#14149)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a7f3efa43abe414afd7f986d77ecb20aa47b279b
Author: Dezhi LIiu <33...@users.noreply.github.com>
AuthorDate: Wed Apr 20 11:24:14 2022 +0800

    [Broker]make revokePermissionsOnTopic method async (#14149)
    
    (cherry picked from commit d7ddda811437096b857bffff7d080a1c555f54d8)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  89 ++++++------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  17 ++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  11 +-
 .../pulsar/broker/web/PulsarWebResource.java       | 154 +++++++++++++--------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   8 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  13 +-
 6 files changed, 185 insertions(+), 107 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7bdefb29098..baab14e88e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
@@ -320,49 +321,54 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
-        Policies policies;
-        try {
-            policies = namespaceResources().getPolicies(namespaceName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
-        if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
-                || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
-            log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
-                    role, topicUri);
-            throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
-        }
-        try {
-            // Write the new policies to metadata store
-            namespaceResources().setPolicies(namespaceName, p -> {
-                p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
-                return p;
-            });
-            log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
-
+    private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
+        return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
+                policiesOptional -> {
+                    Policies policies = policiesOptional.orElseThrow(() ->
+                            new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                    if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
+                            || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
+                        log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
+                                clientAppId(), role, topicUri);
+                        return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                                "Permissions are not set at the topic level"));
+                    }
+                    // Write the new policies to metadata store
+                    return namespaceResources().setPoliciesAsync(namespaceName, p -> {
+                        p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+                        return p;
+                    }).thenAccept(__ ->
+                            log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
+                            topicUri)
+                    );
+                }
+        );
     }
 
-    protected void internalRevokePermissionsOnTopic(String role) {
+    protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                revokePermissions(topicNamePartition.toString(), role);
-            }
-        }
-        revokePermissions(topicName.toString(), role);
+        validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+            getPartitionedTopicMetadataAsync(topicName, true, false)
+                .thenCompose(metadata -> {
+                    int numPartitions = metadata.partitions;
+                    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                    if (numPartitions > 0) {
+                        for (int i = 0; i < numPartitions; i++) {
+                            TopicName topicNamePartition = topicName.getPartition(i);
+                            future = future.thenComposeAsync(unused ->
+                                    revokePermissionsAsync(topicNamePartition.toString(), role));
+                        }
+                    }
+                    return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
+                            .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
+                }))
+            ).exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicName, realCause);
+                    resumeAsyncResponseExceptionally(asyncResponse, realCause);
+                    return null;
+                });
     }
 
     protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
@@ -3999,7 +4005,8 @@ public class PersistentTopicsBase extends AdminResource {
             } catch (RestException e) {
                 try {
                     validateAdminAccessForTenant(pulsar,
-                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
+                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
+                            pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
                 } catch (RestException authException) {
                     log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
                     throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 283ba0ebccd..657f73824aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -141,11 +141,18 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Permissions are not set at the topic level")})
-    public void revokePermissionsOnTopic(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalRevokePermissionsOnTopic(role);
+    public void revokePermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("role") String role) {
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalRevokePermissionsOnTopic(asyncResponse, role);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c0f69f3270c..3f1350105ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -192,6 +192,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Permissions are not set at the topic level"),
             @ApiResponse(code = 500, message = "Internal server error")})
     public void revokePermissionsOnTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -200,8 +201,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Client role to which grant permissions", required = true)
             @PathParam("role") String role) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalRevokePermissionsOnTopic(role);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalRevokePermissionsOnTopic(asyncResponse, role);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d70934b08c7..5f7fb8fad41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
@@ -258,7 +259,8 @@ public abstract class PulsarWebResource {
      */
     protected void validateAdminAccessForTenant(String tenant) {
         try {
-            validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData());
+            validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData(),
+                    config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
@@ -268,65 +270,109 @@ public abstract class PulsarWebResource {
     }
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
-                                                       String originalPrincipal, String tenant,
-                                                       AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                String originalPrincipal, String tenant,
+                                                AuthenticationDataSource authenticationData,
+                                                long timeout, TimeUnit unit) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, originalPrincipal, tenant, authenticationData)
+                    .get(timeout, unit);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            Throwable realCause = FutureUtil.unwrapCompletionException(e);
+            if (realCause instanceof WebApplicationException) {
+                throw (WebApplicationException) realCause;
+            } else {
+                throw new RestException(realCause);
+            }
+        }
+    }
+
+    /**
+     * Checks that the http client role has admin access to the specified tenant async.
+     *
+     * @param tenant the tenant id
+     */
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String tenant) {
+        return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), originalPrincipal(), tenant,
+                clientAuthData());
+    }
+
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
                     (isClientAuthenticated(clientAppId)), clientAppId);
         }
-
-        TenantInfo tenantInfo = pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant does not exist"));
-
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId)) {
-                throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
-            }
-
-            validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, originalPrincipal);
-
-            if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-                CompletableFuture<Boolean> isProxySuperUserFuture;
-                CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
-                try {
-                    AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
-                    isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId, authenticationData);
-
-                    isOriginalPrincipalSuperUserFuture =
-                            authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
-                    boolean proxyAuthorized = isProxySuperUserFuture.get()
-                            || authorizationService.isTenantAdmin(tenant, clientAppId,
-                            tenantInfo, authenticationData).get();
-                    boolean originalPrincipalAuthorized =
-                            isOriginalPrincipalSuperUserFuture.get() || authorizationService.isTenantAdmin(tenant,
-                                    originalPrincipal, tenantInfo, authenticationData).get();
-                    if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                String.format("Proxy not authorized to access resource (proxy:%s,original:%s)",
-                                        clientAppId, originalPrincipal));
+        return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-                }
-                log.debug("Successfully authorized {} (proxied by {}) on tenant {}",
-                          originalPrincipal, clientAppId, tenant);
-            } else {
-                if (!pulsar.getBrokerService()
-                        .getAuthorizationService()
-                        .isSuperUser(clientAppId, authenticationData)
-                        .join()) {
-                    if (!pulsar.getBrokerService().getAuthorizationService()
-                            .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                "Don't have permission to administrate resources on this tenant");
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+                        }
+                        validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
+                                originalPrincipal);
+                        if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    pulsar.getBrokerService().getAuthorizationService();
+                            return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+                                            authenticationData)
+                                .thenCompose(isTenantAdmin -> {
+                                    String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}";
+                                    if (!isTenantAdmin) {
+                                            return authorizationService.isSuperUser(clientAppId, authenticationData)
+                                                .thenCombine(authorizationService.isSuperUser(originalPrincipal,
+                                                             authenticationData),
+                                                     (proxyAuthorized, originalPrincipalAuthorized) -> {
+                                                         if (!proxyAuthorized || !originalPrincipalAuthorized) {
+                                                             throw new RestException(Status.UNAUTHORIZED,
+                                                                     String.format("Proxy not authorized to access "
+                                                                                     + "resource (proxy:%s,original:%s)"
+                                                                             , clientAppId, originalPrincipal));
+                                                         } else {
+                                                             if (log.isDebugEnabled()) {
+                                                                 log.debug(debugMsg, originalPrincipal, clientAppId,
+                                                                         tenant);
+                                                             }
+                                                             return null;
+                                                         }
+                                                     });
+                                    } else {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug(debugMsg, originalPrincipal, clientAppId, tenant);
+                                        }
+                                        return CompletableFuture.completedFuture(null);
+                                    }
+                                });
+                        } else {
+                            return pulsar.getBrokerService()
+                                    .getAuthorizationService()
+                                    .isSuperUser(clientAppId, authenticationData)
+                                    .thenCompose(isSuperUser -> {
+                                        if (!isSuperUser) {
+                                            return pulsar.getBrokerService().getAuthorizationService()
+                                                    .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData);
+                                        } else {
+                                            return CompletableFuture.completedFuture(true);
+                                        }
+                                    }).thenAccept(authorized -> {
+                                        if (!authorized) {
+                                            throw new RestException(Status.UNAUTHORIZED,
+                                                    "Don't have permission to administrate resources on this tenant");
+                                        } else {
+                                            log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
+                                        }
+                                    });
+                        }
+                    } else {
+                        return CompletableFuture.completedFuture(null);
                     }
-                }
-
-                log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant);
-            }
-        }
+                });
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 0a8eceddf10..bd70861cd84 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -819,14 +819,16 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 namespace, topic);
         assertEquals(permission.get(role), actions);
         // remove permission
-        persistentTopics.revokePermissionsOnTopic(property, cluster, namespace, topic, role);
-
+        response = mock(AsyncResponse.class);
+        persistentTopics.revokePermissionsOnTopic(response, property, cluster, namespace, topic, role);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         // verify removed permission
         Awaitility.await().untilAsserted(() -> {
             Map<String, Set<AuthAction>> p = persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
             assertTrue(p.isEmpty());
         });
-
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index b8837e5d686..4ccfc08e9b5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -683,7 +683,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
         persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
-        persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, topicName, role);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, topicName, role);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
         Assert.assertEquals(permissions.get(role), null);
     }
@@ -702,7 +706,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
         persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
-        persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role);
+        response = mock(AsyncResponse.class);
+        persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
         Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
                 partitionedTopicName);
         Assert.assertEquals(permissions.get(role), null);