You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/09 01:43:44 UTC

[GitHub] [pulsar] mattisonchao commented on a change in pull request #14149: [Broker]make revokePermissionsOnTopic method async

mattisonchao commented on a change in pull request #14149:
URL: https://github.com/apache/pulsar/pull/14149#discussion_r802201203



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -327,49 +327,61 @@ protected void internalDeleteTopicForcefully(boolean authoritative, boolean dele
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
+    private CompletableFuture<Void> revokePermissions(String topicUri, String role) {
         Policies policies;
         try {
             policies = namespaceResources().getPolicies(namespaceName)
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
         } catch (Exception e) {
             log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(e));
         }
         if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
                 || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
             log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
                     role, topicUri);
-            throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
-        }
-        try {
-            // Write the new policies to metadata store
-            namespaceResources().setPolicies(namespaceName, p -> {
-                p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
-                return p;
-            });
-            log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                    "Permissions are not set at the topic level"));
         }
 
+        // Write the new policies to metadata store
+         return namespaceResources().setPoliciesAsync(namespaceName, p -> {
+            p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+            return p;
+        }).thenAccept(__ -> log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
+                 topicUri)
+
+        ).exceptionally(ex -> {

Review comment:
       It may be better to let the upper layer call to handle the exception,

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -327,49 +327,61 @@ protected void internalDeleteTopicForcefully(boolean authoritative, boolean dele
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
+    private CompletableFuture<Void> revokePermissions(String topicUri, String role) {
         Policies policies;
         try {
             policies = namespaceResources().getPolicies(namespaceName)
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
         } catch (Exception e) {
             log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(e));
         }
         if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
                 || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
             log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
                     role, topicUri);
-            throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
-        }
-        try {
-            // Write the new policies to metadata store
-            namespaceResources().setPolicies(namespaceName, p -> {
-                p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
-                return p;
-            });
-            log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                    "Permissions are not set at the topic level"));
         }
 
+        // Write the new policies to metadata store
+         return namespaceResources().setPoliciesAsync(namespaceName, p -> {
+            p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+            return p;
+        }).thenAccept(__ -> log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
+                 topicUri)
+
+        ).exceptionally(ex -> {
+            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+            log.error("[{}] Failed revoke access for role {} - topic {}", clientAppId(), role, topicUri, realCause);
+            throw new RestException(realCause);
+
+        });
     }
 
-    protected void internalRevokePermissionsOnTopic(String role) {
+    protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                revokePermissions(topicNamePartition.toString(), role);
-            }
-        }
-        revokePermissions(topicName.toString(), role);
+        validatePoliciesReadOnlyAccessAsync().thenCompose(__ ->
+            getPartitionedTopicMetadataAsync(topicName, true, false)
+                .thenCompose(metadata -> {
+                    int numPartitions = metadata.partitions;
+                    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                    if (numPartitions > 0) {
+                        for (int i = 0; i < numPartitions; i++) {
+                            TopicName topicNamePartition = topicName.getPartition(i);
+                            future = future.thenComposeAsync(unused -> revokePermissions(topicNamePartition.toString(),
+                                    role));
+                        }
+                    }
+                    return future.thenComposeAsync(unused -> revokePermissions(topicName.toString(), role))

Review comment:
       Why use ``ComposeAsync``? 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -139,11 +139,18 @@ public void grantPermissionsOnTopic(@PathParam("property") String property,
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Permissions are not set at the topic level")})
-    public void revokePermissionsOnTopic(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalRevokePermissionsOnTopic(role);
+    public void revokePermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("role") String role) {
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);

Review comment:
       Maybe it's enough that we just capture ``validateTopicName``?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -327,49 +327,61 @@ protected void internalDeleteTopicForcefully(boolean authoritative, boolean dele
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
+    private CompletableFuture<Void> revokePermissions(String topicUri, String role) {
         Policies policies;
         try {
             policies = namespaceResources().getPolicies(namespaceName)

Review comment:
       This method can be async

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -327,49 +327,61 @@ protected void internalDeleteTopicForcefully(boolean authoritative, boolean dele
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
+    private CompletableFuture<Void> revokePermissions(String topicUri, String role) {
         Policies policies;
         try {
             policies = namespaceResources().getPolicies(namespaceName)
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
         } catch (Exception e) {
             log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(e));
         }
         if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
                 || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
             log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
                     role, topicUri);
-            throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
-        }
-        try {
-            // Write the new policies to metadata store
-            namespaceResources().setPolicies(namespaceName, p -> {
-                p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
-                return p;
-            });
-            log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                    "Permissions are not set at the topic level"));
         }
 
+        // Write the new policies to metadata store
+         return namespaceResources().setPoliciesAsync(namespaceName, p -> {
+            p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+            return p;
+        }).thenAccept(__ -> log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
+                 topicUri)
+
+        ).exceptionally(ex -> {
+            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+            log.error("[{}] Failed revoke access for role {} - topic {}", clientAppId(), role, topicUri, realCause);
+            throw new RestException(realCause);
+
+        });
     }
 
-    protected void internalRevokePermissionsOnTopic(String role) {
+    protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());

Review comment:
       this method can by async.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -327,49 +327,61 @@ protected void internalDeleteTopicForcefully(boolean authoritative, boolean dele
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
+    private CompletableFuture<Void> revokePermissions(String topicUri, String role) {
         Policies policies;
         try {
             policies = namespaceResources().getPolicies(namespaceName)
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
         } catch (Exception e) {
             log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(e));
         }
         if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
                 || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
             log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
                     role, topicUri);
-            throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
-        }
-        try {
-            // Write the new policies to metadata store
-            namespaceResources().setPolicies(namespaceName, p -> {
-                p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
-                return p;
-            });
-            log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                    "Permissions are not set at the topic level"));
         }
 
+        // Write the new policies to metadata store
+         return namespaceResources().setPoliciesAsync(namespaceName, p -> {
+            p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+            return p;
+        }).thenAccept(__ -> log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
+                 topicUri)
+
+        ).exceptionally(ex -> {
+            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+            log.error("[{}] Failed revoke access for role {} - topic {}", clientAppId(), role, topicUri, realCause);
+            throw new RestException(realCause);
+
+        });
     }
 
-    protected void internalRevokePermissionsOnTopic(String role) {
+    protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                revokePermissions(topicNamePartition.toString(), role);
-            }
-        }
-        revokePermissions(topicName.toString(), role);
+        validatePoliciesReadOnlyAccessAsync().thenCompose(__ ->
+            getPartitionedTopicMetadataAsync(topicName, true, false)
+                .thenCompose(metadata -> {
+                    int numPartitions = metadata.partitions;
+                    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                    if (numPartitions > 0) {
+                        for (int i = 0; i < numPartitions; i++) {
+                            TopicName topicNamePartition = topicName.getPartition(i);
+                            future = future.thenComposeAsync(unused -> revokePermissions(topicNamePartition.toString(),

Review comment:
       Why use ``ComposeAsync``? 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -198,8 +199,14 @@ public void revokePermissionsOnTopic(
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Client role to which grant permissions", required = true)
             @PathParam("role") String role) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalRevokePermissionsOnTopic(role);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalRevokePermissionsOnTopic(asyncResponse, role);
+        } catch (WebApplicationException wae) {

Review comment:
       Maybe it's enough that we just capture validateTopicName?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org