You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/24 00:39:15 UTC

[pulsar] branch master updated: [Broker] make grantPermissionsOnTopic method async (#14152)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1344b33e411 [Broker] make grantPermissionsOnTopic method async (#14152)
1344b33e411 is described below

commit 1344b33e411e234b55595307dc6c1e3be14b8a55
Author: Dezhi LIiu <33...@users.noreply.github.com>
AuthorDate: Sun Apr 24 08:39:09 2022 +0800

    [Broker] make grantPermissionsOnTopic method async (#14152)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 95 ++++++++++++----------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 18 ++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 11 ++-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 26 +++++-
 5 files changed, 101 insertions(+), 55 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index addf02f8005..909c7de8d74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -256,53 +256,62 @@ public class PersistentTopicsBase extends AdminResource {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
-        try {
-            AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
-            if (null != authService) {
-                authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/).get();
-            } else {
-                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
-            }
-            log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
-                    topicUri);
-        } catch (InterruptedException e) {
-            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            // The IllegalArgumentException and the IllegalStateException were historically thrown by the
-            // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
-            if (e.getCause() instanceof MetadataStoreException.NotFoundException
-                    || e.getCause() instanceof IllegalArgumentException) {
-                log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", clientAppId(),
-                        topicUri, e);
-                throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
-            } else if (e.getCause() instanceof MetadataStoreException.BadVersionException
-                    || e.getCause() instanceof IllegalStateException) {
-                log.warn("[{}] Failed to set permissions for topic {}: {}",
-                        clientAppId(), topicUri, e.getCause().getMessage(), e);
-                throw new RestException(Status.CONFLICT, "Concurrent modification");
-            } else {
-                log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri, String role, Set<AuthAction> actions) {
+        AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
+                    .thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
+                            clientAppId(), role, actions, topicUri))
+                    .exceptionally(ex -> {
+                        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                        //The IllegalArgumentException and the IllegalStateException were historically thrown by the
+                        // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
+                        if (realCause instanceof MetadataStoreException.NotFoundException
+                                || realCause instanceof IllegalArgumentException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
+                                    clientAppId(), topicUri, realCause);
+                            throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
+                        } else if (realCause instanceof MetadataStoreException.BadVersionException
+                                || realCause instanceof IllegalStateException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
+                                    realCause.getMessage(), realCause);
+                            throw new RestException(Status.CONFLICT, "Concurrent modification");
+                        } else {
+                            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
+                                    realCause);
+                            throw new RestException(realCause);
+                        }
+                    });
+        } else {
+            String msg = "Authorization is not enabled";
+            return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
         }
     }
 
-    protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
+    protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
+                                                   Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                grantPermissions(topicNamePartition, role, actions);
-            }
-        }
-        grantPermissions(topicName, role, actions);
+        validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+             getPartitionedTopicMetadataAsync(topicName, true, false)
+                  .thenCompose(metadata -> {
+                      int numPartitions = metadata.partitions;
+                      CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                      if (numPartitions > 0) {
+                          for (int i = 0; i < numPartitions; i++) {
+                              TopicName topicNamePartition = topicName.getPartition(i);
+                              future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
+                                      actions));
+                          }
+                      }
+                      return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions))
+                              .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
+                  }))).exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
+                    resumeAsyncResponseExceptionally(asyncResponse, realCause);
+                    return null;
+                });
     }
 
     protected void internalDeleteTopicForcefully(boolean authoritative, boolean deleteSchema) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8353756dce0..c24e0c34b40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -122,12 +122,20 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Concurrent modification") })
-    public void grantPermissionsOnTopic(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role,
+    public void grantPermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("role") String role,
             Set<AuthAction> actions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalGrantPermissionsOnTopic(role, actions);
+
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalGrantPermissionsOnTopic(asyncResponse, role, actions);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f640b1ca6de..3f12df47f4e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -163,6 +163,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
     public void grantPermissionsOnTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -174,8 +175,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Actions to be granted (produce,functions,consume)",
                     allowableValues = "produce,functions,consume")
                     Set<AuthAction> actions) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalGrantPermissionsOnTopic(role, actions);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGrantPermissionsOnTopic(asyncResponse, role, actions);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 91ec9adecc7..11bed0583b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -780,7 +780,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         // grant permission
         final Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce);
         final String role = "test-role";
-        persistentTopics.grantPermissionsOnTopic(property, cluster, namespace, topic, role, actions);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, property, cluster, namespace, topic, role, actions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         // verify permission
         Map<String, Set<AuthAction>> permission = persistentTopics.getPermissionsOnTopic(property, cluster,
                 namespace, topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 22014fc37e4..6c72e19437f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -623,7 +623,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
         Assert.assertEquals(permissions.get(role), expectActions);
     }
@@ -659,7 +663,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
+                expectActions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
                 partitionedTopicName);
         Assert.assertEquals(permissions.get(role), expectActions);
@@ -680,9 +689,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions);
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, topicName, role);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -703,7 +716,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, partitionedTopicName, role, expectActions);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
+                expectActions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         response = mock(AsyncResponse.class);
         persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
         responseCaptor = ArgumentCaptor.forClass(Response.class);