You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/01 05:19:29 UTC

[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16792: fix can not revoke permission after update topic partition

michaeljmarshall commented on code in PR #16792:
URL: https://github.com/apache/pulsar/pull/16792#discussion_r960227112


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenantAsync(namespaceName.getTenant())
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-             getPartitionedTopicMetadataAsync(topicName, true, false)
-                  .thenCompose(metadata -> {
-                      int numPartitions = metadata.partitions;
-                      CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
-                      if (numPartitions > 0) {
-                          for (int i = 0; i < numPartitions; i++) {
-                              TopicName topicNamePartition = topicName.getPartition(i);
-                              future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
-                                      actions));
-                          }
-                      }

Review Comment:
   In order to make this change backwards compatible, I think we need to leave this block (at least in the PR that is focused on fixing the historical bug). Otherwise, custom implementations of the `AuthorizationProvider` interface would have the behavior broken on them, which shouldn't happen in a patch release.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -271,6 +271,24 @@ protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissions
                         }
                     }
                 }
+
+                // If topic is partitioned, add based topic permission
+                if (topicName.isPartitioned() && auth.getTopicAuthentication().containsKey(
+                        topicName.getPartitionedTopicName())) {
+                    for (Map.Entry<String, Set<AuthAction>> entry :
+                            auth.getTopicAuthentication().get(topicName.getPartitionedTopicName()).entrySet()) {
+                        String role = entry.getKey();
+                        Set<AuthAction> topicPermissions = entry.getValue();
+
+                        if (!permissions.containsKey(role)) {
+                            permissions.put(role, topicPermissions);
+                        } else {
+                            // Do the union between namespace and topic level
+                            Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
+                            permissions.put(role, union);
+                        }
+                    }
+                }

Review Comment:
   I am guessing this is meant to grant permission based on a topic's partition name. This code block will be insufficient. We'll need to look at the `AuthorizationProvider` to get a complete implementation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenantAsync(namespaceName.getTenant())
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-             getPartitionedTopicMetadataAsync(topicName, true, false)
-                  .thenCompose(metadata -> {
-                      int numPartitions = metadata.partitions;
-                      CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
-                      if (numPartitions > 0) {
-                          for (int i = 0; i < numPartitions; i++) {
-                              TopicName topicNamePartition = topicName.getPartition(i);
-                              future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
-                                      actions));
-                          }
-                      }
-                      return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions))
-                              .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
-                  }))).exceptionally(ex -> {
+                        grantPermissionsAsync(TopicName.get(topicName.getPartitionedTopicName()), role, actions)

Review Comment:
   Instead of passing the topic name without any of the partition information. I think we should instead grant permission to the result `topicName`, and then we can leave it up to the `AuthorizationProvider` to determine how to handle partitioned topics. The current method already calls `grantPermissionAsync` with that `topicName`, so we might be able to simply deprecate passing each partition to the `AuthorizationProvider`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenantAsync(namespaceName.getTenant())
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-             getPartitionedTopicMetadataAsync(topicName, true, false)
-                  .thenCompose(metadata -> {
-                      int numPartitions = metadata.partitions;
-                      CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
-                      if (numPartitions > 0) {
-                          for (int i = 0; i < numPartitions; i++) {
-                              TopicName topicNamePartition = topicName.getPartition(i);
-                              future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
-                                      actions));
-                          }
-                      }
-                      return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions))
-                              .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
-                  }))).exceptionally(ex -> {
+                        grantPermissionsAsync(TopicName.get(topicName.getPartitionedTopicName()), role, actions)
+                                .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))))
+                .exceptionally(ex -> {
                     Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                     log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
                     resumeAsyncResponseExceptionally(asyncResponse, realCause);
                     return null;
                 });
     }
 
-    private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
+    private CompletableFuture<Void> revokePermissionsAsync(TopicName topicName, String role, int numPartitions) {
+        String topicUri = topicName.toString();
         return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
                 policiesOptional -> {
                     Policies policies = policiesOptional.orElseThrow(() ->
                             new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                    // do compatible with previous pulsar version
+                    // revoke all the partition permissions granted in previous version
+                    future = future.thenComposeAsync(unused ->
+                        namespaceResources().setPoliciesAsync(namespaceName, p -> {
+                            if (numPartitions > 0) {
+                                for (int i = 0; i < numPartitions; i++) {
+                                    p.auth_policies.getTopicAuthentication().computeIfPresent(
+                                            topicName.getPartition(i).toString(), (k, roles) -> {
+                                                roles.remove(role);
+                                                if (roles.isEmpty()) {
+                                                    return null;
+                                                }
+                                                return roles;
+                                            });
+                                }
+                            }
+                            return p;
+                        }));
                     if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
                             || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {

Review Comment:
   I wonder if instead of failing here, we should just attempt to remove permission on every partition, even if it isn't in the map? I think that would be the expected behavior. This would also be backwards compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org