You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/24 03:44:54 UTC

[GitHub] [pulsar] zymap commented on a change in pull request #14747: [cleanup][broker]: Refactor PulsarAuthorizationProvider.

zymap commented on a change in pull request #14747:
URL: https://github.com/apache/pulsar/pull/14747#discussion_r833887459



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
##########
@@ -178,36 +160,20 @@ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResource
     @Override
     public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
             AuthenticationDataSource authenticationData) {
-        CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
-        canProduceAsync(topicName, role, authenticationData).whenComplete((produceAuthorized, ex) -> {
-            if (ex == null) {
-                if (produceAuthorized) {
-                    finalResult.complete(produceAuthorized);
-                    return;
-                }
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug(
-                            "Topic [{}] Role [{}] exception occurred while trying to check Produce permissions. {}",
-                            topicName.toString(), role, ex.getMessage());
-                }
-            }
-            canConsumeAsync(topicName, role, authenticationData, null).whenComplete((consumeAuthorized, e)
-                    -> {
-                if (e == null) {
-                    finalResult.complete(consumeAuthorized);
-                } else {
+        return canProduceAsync(topicName, role, authenticationData)
+                .thenCompose(canProduce -> {
+                    if (canProduce) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    return canConsumeAsync(topicName, role, authenticationData, null);
+                }).exceptionally(ex -> {
                     if (log.isDebugEnabled()) {
-                        log.debug(
-                                "Topic [{}] Role [{}] exception occurred while trying to check Consume permissions. {}",
-                                topicName.toString(), role, e.getMessage());
+                        log.debug("Topic [{}] Role [{}] exception occurred while trying to check produce/consume"
+                                + " permissions. {}", topicName.toString(), role, ex.getMessage());
 
                     }
-                    finalResult.completeExceptionally(e);
-                }
-            });
-        });
-        return finalResult;
+                    return null;

Review comment:
       Looks like this will change the original behaviors. We used to return an exceptional CompletableFuture so the caller will know the CompletableFuture is failed. If we return null in the `exceptionally` method, the caller will receive null and if there has someone use it, it will cause a NPE.

##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
##########
@@ -231,185 +197,149 @@ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResource
     private CompletableFuture<Boolean> allowConsumeOrProduceOpsAsync(NamespaceName namespaceName,
                                                                      String role,
                                                                      AuthenticationDataSource authenticationData) {
-        CompletableFuture<Boolean> finalResult = new CompletableFuture<>();
-        allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.consume)
-                .whenComplete((consumeAuthorized, e) -> {
-                    if (e == null) {
-                        if (consumeAuthorized) {
-                            finalResult.complete(consumeAuthorized);
-                            return;
-                        }
-                    } else {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Namespace [{}] Role [{}] exception occurred while trying to check Consume "
-                                    + "permission. {}", namespaceName, role, e.getCause());
-                        }
+        return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.consume)
+                .thenCompose(canConsumer -> {
+                    if (canConsumer) {
+                        return CompletableFuture.completedFuture(true);
                     }
-                    allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.produce)
-                            .whenComplete((produceAuthorized, ex) -> {
-                                if (ex == null) {
-                                    finalResult.complete(produceAuthorized);
-                                } else {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("Namespace [{}] Role [{}] exception occurred while trying to check "
-                                                + "Produce permission. {}", namespaceName, role, ex.getCause());
-                                    }
-                                    finalResult.completeExceptionally(ex.getCause());
-                                }
-                            });
+                    return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.produce);
                 });
-
-        return finalResult;
     }
 
     private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role,
                                                                        AuthenticationDataSource authenticationData,
                                                                        AuthAction authAction) {
-        CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
-        try {
-            pulsarResources.getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(policies -> {
-                if (!policies.isPresent()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
-                    }
-                } else {
-                    Map<String, Set<AuthAction>> namespaceRoles = policies.get()
-                            .auth_policies.getNamespaceAuthentication();
-                    Set<AuthAction> namespaceActions = namespaceRoles.get(role);
-                    if (namespaceActions != null && namespaceActions.contains(authAction)) {
-                        // The role has namespace level permission
-                        permissionFuture.complete(true);
-                        return;
-                    }
+        return pulsarResources.getNamespaceResources().getPoliciesAsync(namespaceName).thenApply(policies -> {
+            if (!policies.isPresent()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
+                }
+            } else {
+                Map<String, Set<AuthAction>> namespaceRoles = policies.get()
+                        .auth_policies.getNamespaceAuthentication();
+                Set<AuthAction> namespaceActions = namespaceRoles.get(role);
+                if (namespaceActions != null && namespaceActions.contains(authAction)) {
+                    // The role has namespace level permission
+                    return true;
+                }
 
-                    // Using wildcard
-                    if (conf.isAuthorizationAllowWildcardsMatching()) {
-                        if (checkWildcardPermission(role, authAction, namespaceRoles)) {
-                            // The role has namespace level permission by wildcard match
-                            permissionFuture.complete(true);
-                            return;
-                        }
+                // Using wildcard
+                if (conf.isAuthorizationAllowWildcardsMatching()) {
+                    if (checkWildcardPermission(role, authAction, namespaceRoles)) {
+                        // The role has namespace level permission by wildcard match
+                        return true;
                     }
                 }
-                permissionFuture.complete(false);
-            }).exceptionally(ex -> {
-                log.warn("Client  with Role - {} failed to get permissions for namespace - {}. {}", role, namespaceName,
-                        ex.getMessage());
-                permissionFuture.completeExceptionally(ex);
-                return null;
-            });
-        } catch (Exception e) {
-            log.warn("Client  with Role - {} failed to get permissions for namespace - {}. {}", role, namespaceName,
-                    e.getMessage());
-            permissionFuture.completeExceptionally(e);
-        }
-        return permissionFuture;
+            }
+            return false;
+        });
     }
 
     @Override
     public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions,
-            String role, String authDataJson) {
-        try {
-            validatePoliciesReadOnlyAccess();
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
+                                                        String role, String authDataJson) {
 
-        String topicUri = topicName.toString();
-        return pulsarResources.getNamespaceResources()
-                .setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
-                    policies.auth_policies.getTopicAuthentication()
-                            .computeIfAbsent(topicUri, __ -> new HashMap<>())
-                            .put(role, actions);
-                    return policies;
-                }).whenComplete((__, throwable) -> {
-                    if (throwable != null) {
-                        log.error("[{}] Failed to set permissions for role {} on topic {}", role, role, topicName,
-                                throwable);
-                    } else {
-                        log.info("[{}] Successfully granted access for role {}: {} - topic {}", role, role, actions,
-                                topicUri);
-                    }
-                });
+        return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+            if (readonly) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Policies are read-only. Broker cannot do read-write operations");
+                }
+                throw new IllegalStateException("policies are in readonly mode");
+            }
+            String topicUri = topicName.toString();
+            return pulsarResources.getNamespaceResources()
+                    .setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
+                        policies.auth_policies.getTopicAuthentication()
+                                .computeIfAbsent(topicUri, __ -> new HashMap<>())
+                                .put(role, actions);
+                        return policies;
+                    }).whenComplete((__, ex) -> {
+                        if (ex != null) {
+                            log.error("Failed to set permissions for role {} on topic {}", role, topicName, ex);
+                        } else {
+                            log.info("Successfully granted access for role {}: {} - topic {}", role, actions, topicUri);
+                        }
+                    });
+        });
     }
 
     @Override
     public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName, Set<AuthAction> actions,
-            String role, String authDataJson) {
-        try {
-            validatePoliciesReadOnlyAccess();
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-
-        return pulsarResources.getNamespaceResources()
-                .setPoliciesAsync(namespaceName, policies -> {
-                    policies.auth_policies.getNamespaceAuthentication().put(role, actions);
-                    return policies;
-                }).whenComplete((__, throwable) -> {
-                    if (throwable != null) {
-                        log.error("[{}] Failed to set permissions for role {} namespace {}", role, role, namespaceName,
-                                throwable);
-                    } else {
-                        log.info("[{}] Successfully granted access for role {}: {} - namespace {}", role, role, actions,
-                                namespaceName);
-                    }
-                });
+                                                        String role, String authDataJson) {
+        return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+            if (readonly) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Policies are read-only. Broker cannot do read-write operations");
+                }
+                throw new IllegalStateException("policies are in readonly mode");

Review comment:
       If no one catches the exception, the future will never be complete. For example:
   
   ```
           CompletableFuture future = new CompletableFuture();
           future.thenRun(() -> {
               System.out.println("first thing");
           }).thenCompose(b -> {
               throw new IllegalArgumentException("fail future");
           }).thenRun(() -> {
               System.out.println("other things");
           });
           future.get();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org