You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/09 01:27:53 UTC

[GitHub] [pulsar] mattisonchao commented on a change in pull request #14152: [Broker]make grantPermissionsOnTopic method async

mattisonchao commented on a change in pull request #14152:
URL: https://github.com/apache/pulsar/pull/14152#discussion_r802193345



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -262,53 +262,68 @@ public void validateAdminOperationOnTopic(boolean authoritative) {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
-        try {
-            AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
-            if (null != authService) {
-                authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/).get();
-            } else {
-                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
-            }
-            log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
-                    topicUri);
-        } catch (InterruptedException e) {
-            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            // The IllegalArgumentException and the IllegalStateException were historically thrown by the
-            // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
-            if (e.getCause() instanceof MetadataStoreException.NotFoundException
-                    || e.getCause() instanceof IllegalArgumentException) {
-                log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", clientAppId(),
-                        topicUri, e);
-                throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
-            } else if (e.getCause() instanceof MetadataStoreException.BadVersionException
-                    || e.getCause() instanceof IllegalStateException) {
-                log.warn("[{}] Failed to set permissions for topic {}: {}",
-                        clientAppId(), topicUri, e.getCause().getMessage(), e);
-                throw new RestException(Status.CONFLICT, "Concurrent modification");
-            } else {
-                log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
+        AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
+                    .thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
+                            clientAppId(), role, actions, topicUri))
+                    .exceptionally(ex -> {
+                        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                        //The IllegalArgumentException and the IllegalStateException were historically thrown by the
+                        // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
+                        if (realCause instanceof MetadataStoreException.NotFoundException
+                                || realCause instanceof IllegalArgumentException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
+                                    clientAppId(), topicUri, realCause);
+                            throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
+                        } else if (realCause instanceof MetadataStoreException.BadVersionException
+                                || realCause instanceof IllegalStateException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
+                                    realCause.getMessage(), realCause);
+                            throw new RestException(Status.CONFLICT, "Concurrent modification");
+                        } else {
+                            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
+                                    realCause);
+                            throw new RestException(realCause);
+                        }
+                    });
+        } else {
+            String msg = "Authorization is not enabled";
+            log.error("[{}] Failed to get permissions for topic {}, because  {}", clientAppId(), topicUri, msg);
+            return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
         }
     }
 
-    protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
+    protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
+                                                   Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());

Review comment:
       I think this method can be async.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -262,53 +262,68 @@ public void validateAdminOperationOnTopic(boolean authoritative) {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
-        try {
-            AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
-            if (null != authService) {
-                authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/).get();
-            } else {
-                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
-            }
-            log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
-                    topicUri);
-        } catch (InterruptedException e) {
-            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            // The IllegalArgumentException and the IllegalStateException were historically thrown by the
-            // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
-            if (e.getCause() instanceof MetadataStoreException.NotFoundException
-                    || e.getCause() instanceof IllegalArgumentException) {
-                log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", clientAppId(),
-                        topicUri, e);
-                throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
-            } else if (e.getCause() instanceof MetadataStoreException.BadVersionException
-                    || e.getCause() instanceof IllegalStateException) {
-                log.warn("[{}] Failed to set permissions for topic {}: {}",
-                        clientAppId(), topicUri, e.getCause().getMessage(), e);
-                throw new RestException(Status.CONFLICT, "Concurrent modification");
-            } else {
-                log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
+        AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
+                    .thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
+                            clientAppId(), role, actions, topicUri))
+                    .exceptionally(ex -> {
+                        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                        //The IllegalArgumentException and the IllegalStateException were historically thrown by the
+                        // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
+                        if (realCause instanceof MetadataStoreException.NotFoundException
+                                || realCause instanceof IllegalArgumentException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
+                                    clientAppId(), topicUri, realCause);
+                            throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
+                        } else if (realCause instanceof MetadataStoreException.BadVersionException
+                                || realCause instanceof IllegalStateException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
+                                    realCause.getMessage(), realCause);
+                            throw new RestException(Status.CONFLICT, "Concurrent modification");
+                        } else {
+                            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
+                                    realCause);
+                            throw new RestException(realCause);
+                        }
+                    });
+        } else {
+            String msg = "Authorization is not enabled";
+            log.error("[{}] Failed to get permissions for topic {}, because  {}", clientAppId(), topicUri, msg);
+            return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
         }
     }
 
-    protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
+    protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
+                                                   Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                grantPermissions(topicNamePartition, role, actions);
-            }
-        }
-        grantPermissions(topicName, role, actions);
+        validatePoliciesReadOnlyAccessAsync().thenCompose(__ ->
+             getPartitionedTopicMetadataAsync(topicName, true, false)
+                  .thenCompose(metadata -> {
+                      int numPartitions = metadata.partitions;
+                      CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                      if (numPartitions > 0) {
+                          for (int i = 0; i < numPartitions; i++) {
+                              TopicName topicNamePartition = topicName.getPartition(i);
+                              future = future.thenComposeAsync(unused -> grantPermissions(topicNamePartition, role,
+                                      actions));
+                          }
+                      }
+                      return future.thenComposeAsync(unused -> grantPermissions(topicName, role, actions))

Review comment:
       Why ``ComposeAsync``?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -120,12 +120,20 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Concurrent modification") })
-    public void grantPermissionsOnTopic(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role,
+    public void grantPermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("role") String role,
             Set<AuthAction> actions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalGrantPermissionsOnTopic(role, actions);
+
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalGrantPermissionsOnTopic(asyncResponse, role, actions);
+        } catch (WebApplicationException wae) {

Review comment:
       Maybe it's enough that we just capture validateTopicName ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -262,53 +262,68 @@ public void validateAdminOperationOnTopic(boolean authoritative) {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
-        try {
-            AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
-            if (null != authService) {
-                authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/).get();
-            } else {
-                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
-            }
-            log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
-                    topicUri);
-        } catch (InterruptedException e) {
-            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            // The IllegalArgumentException and the IllegalStateException were historically thrown by the
-            // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
-            if (e.getCause() instanceof MetadataStoreException.NotFoundException
-                    || e.getCause() instanceof IllegalArgumentException) {
-                log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", clientAppId(),
-                        topicUri, e);
-                throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
-            } else if (e.getCause() instanceof MetadataStoreException.BadVersionException
-                    || e.getCause() instanceof IllegalStateException) {
-                log.warn("[{}] Failed to set permissions for topic {}: {}",
-                        clientAppId(), topicUri, e.getCause().getMessage(), e);
-                throw new RestException(Status.CONFLICT, "Concurrent modification");
-            } else {
-                log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
+        AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
+                    .thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
+                            clientAppId(), role, actions, topicUri))
+                    .exceptionally(ex -> {
+                        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                        //The IllegalArgumentException and the IllegalStateException were historically thrown by the
+                        // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
+                        if (realCause instanceof MetadataStoreException.NotFoundException
+                                || realCause instanceof IllegalArgumentException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
+                                    clientAppId(), topicUri, realCause);
+                            throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
+                        } else if (realCause instanceof MetadataStoreException.BadVersionException
+                                || realCause instanceof IllegalStateException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
+                                    realCause.getMessage(), realCause);
+                            throw new RestException(Status.CONFLICT, "Concurrent modification");
+                        } else {
+                            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
+                                    realCause);
+                            throw new RestException(realCause);
+                        }
+                    });
+        } else {
+            String msg = "Authorization is not enabled";
+            log.error("[{}] Failed to get permissions for topic {}, because  {}", clientAppId(), topicUri, msg);
+            return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
         }
     }
 
-    protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
+    protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
+                                                   Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                grantPermissions(topicNamePartition, role, actions);
-            }
-        }
-        grantPermissions(topicName, role, actions);
+        validatePoliciesReadOnlyAccessAsync().thenCompose(__ ->
+             getPartitionedTopicMetadataAsync(topicName, true, false)
+                  .thenCompose(metadata -> {
+                      int numPartitions = metadata.partitions;
+                      CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                      if (numPartitions > 0) {
+                          for (int i = 0; i < numPartitions; i++) {
+                              TopicName topicNamePartition = topicName.getPartition(i);
+                              future = future.thenComposeAsync(unused -> grantPermissions(topicNamePartition, role,

Review comment:
       why don't directly return this ``future``? 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -172,8 +173,14 @@ public void grantPermissionsOnTopic(
             @ApiParam(value = "Actions to be granted (produce,functions,consume)",
                     allowableValues = "produce,functions,consume")
                     Set<AuthAction> actions) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalGrantPermissionsOnTopic(role, actions);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGrantPermissionsOnTopic(asyncResponse, role, actions);
+        } catch (WebApplicationException wae) {

Review comment:
       Maybe it's enough that we just capture validateTopicName ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -262,53 +262,68 @@ public void validateAdminOperationOnTopic(boolean authoritative) {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
-        try {
-            AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
-            if (null != authService) {
-                authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/).get();
-            } else {
-                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
-            }
-            log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
-                    topicUri);
-        } catch (InterruptedException e) {
-            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            // The IllegalArgumentException and the IllegalStateException were historically thrown by the
-            // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
-            if (e.getCause() instanceof MetadataStoreException.NotFoundException
-                    || e.getCause() instanceof IllegalArgumentException) {
-                log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", clientAppId(),
-                        topicUri, e);
-                throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
-            } else if (e.getCause() instanceof MetadataStoreException.BadVersionException
-                    || e.getCause() instanceof IllegalStateException) {
-                log.warn("[{}] Failed to set permissions for topic {}: {}",
-                        clientAppId(), topicUri, e.getCause().getMessage(), e);
-                throw new RestException(Status.CONFLICT, "Concurrent modification");
-            } else {
-                log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
+        AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
+                    .thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
+                            clientAppId(), role, actions, topicUri))
+                    .exceptionally(ex -> {
+                        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                        //The IllegalArgumentException and the IllegalStateException were historically thrown by the
+                        // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
+                        if (realCause instanceof MetadataStoreException.NotFoundException
+                                || realCause instanceof IllegalArgumentException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
+                                    clientAppId(), topicUri, realCause);
+                            throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
+                        } else if (realCause instanceof MetadataStoreException.BadVersionException
+                                || realCause instanceof IllegalStateException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
+                                    realCause.getMessage(), realCause);
+                            throw new RestException(Status.CONFLICT, "Concurrent modification");
+                        } else {
+                            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
+                                    realCause);
+                            throw new RestException(realCause);
+                        }
+                    });
+        } else {
+            String msg = "Authorization is not enabled";
+            log.error("[{}] Failed to get permissions for topic {}, because  {}", clientAppId(), topicUri, msg);
+            return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
         }
     }
 
-    protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
+    protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
+                                                   Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                grantPermissions(topicNamePartition, role, actions);
-            }
-        }
-        grantPermissions(topicName, role, actions);
+        validatePoliciesReadOnlyAccessAsync().thenCompose(__ ->
+             getPartitionedTopicMetadataAsync(topicName, true, false)
+                  .thenCompose(metadata -> {
+                      int numPartitions = metadata.partitions;
+                      CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                      if (numPartitions > 0) {
+                          for (int i = 0; i < numPartitions; i++) {
+                              TopicName topicNamePartition = topicName.getPartition(i);
+                              future = future.thenComposeAsync(unused -> grantPermissions(topicNamePartition, role,
+                                      actions));
+                          }
+                      }
+                      return future.thenComposeAsync(unused -> grantPermissions(topicName, role, actions))
+                              .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))
+                              .exceptionally(ex -> {

Review comment:
       it seems we can combine this ``exceptionally`` to line 321

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -262,53 +262,68 @@ public void validateAdminOperationOnTopic(boolean authoritative) {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
-        try {
-            AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
-            if (null != authService) {
-                authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/).get();
-            } else {
-                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
-            }
-            log.info("[{}] Successfully granted access for role {}: {} - topic {}", clientAppId(), role, actions,
-                    topicUri);
-        } catch (InterruptedException e) {
-            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            // The IllegalArgumentException and the IllegalStateException were historically thrown by the
-            // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
-            if (e.getCause() instanceof MetadataStoreException.NotFoundException
-                    || e.getCause() instanceof IllegalArgumentException) {
-                log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", clientAppId(),
-                        topicUri, e);
-                throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
-            } else if (e.getCause() instanceof MetadataStoreException.BadVersionException
-                    || e.getCause() instanceof IllegalStateException) {
-                log.warn("[{}] Failed to set permissions for topic {}: {}",
-                        clientAppId(), topicUri, e.getCause().getMessage(), e);
-                throw new RestException(Status.CONFLICT, "Concurrent modification");
-            } else {
-                log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri, e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
+        AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            return authService.grantPermissionAsync(topicUri, actions, role, null/*additional auth-data json*/)
+                    .thenAccept(__ -> log.info("[{}] Successfully granted access for role {}: {} - topic {}",
+                            clientAppId(), role, actions, topicUri))
+                    .exceptionally(ex -> {
+                        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                        //The IllegalArgumentException and the IllegalStateException were historically thrown by the
+                        // grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
+                        if (realCause instanceof MetadataStoreException.NotFoundException
+                                || realCause instanceof IllegalArgumentException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist",
+                                    clientAppId(), topicUri, realCause);
+                            throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
+                        } else if (realCause instanceof MetadataStoreException.BadVersionException
+                                || realCause instanceof IllegalStateException) {
+                            log.warn("[{}] Failed to set permissions for topic {}: {}", clientAppId(), topicUri,
+                                    realCause.getMessage(), realCause);
+                            throw new RestException(Status.CONFLICT, "Concurrent modification");
+                        } else {
+                            log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicUri,
+                                    realCause);
+                            throw new RestException(realCause);
+                        }
+                    });
+        } else {
+            String msg = "Authorization is not enabled";
+            log.error("[{}] Failed to get permissions for topic {}, because  {}", clientAppId(), topicUri, msg);
+            return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
         }
     }
 
-    protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
+    protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
+                                                   Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                grantPermissions(topicNamePartition, role, actions);
-            }
-        }
-        grantPermissions(topicName, role, actions);
+        validatePoliciesReadOnlyAccessAsync().thenCompose(__ ->
+             getPartitionedTopicMetadataAsync(topicName, true, false)
+                  .thenCompose(metadata -> {
+                      int numPartitions = metadata.partitions;
+                      CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                      if (numPartitions > 0) {
+                          for (int i = 0; i < numPartitions; i++) {
+                              TopicName topicNamePartition = topicName.getPartition(i);
+                              future = future.thenComposeAsync(unused -> grantPermissions(topicNamePartition, role,

Review comment:
       why ``ComposeAsync``? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org