You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/05 09:56:31 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] fix can not revoke permission after update topic partition (#17393)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 2d0190f9e65 [fix][broker] fix can not revoke permission after update topic partition (#17393)
2d0190f9e65 is described below

commit 2d0190f9e659650aa385f945ff09a882411e324c
Author: ken <16...@qq.com>
AuthorDate: Mon Sep 5 10:59:08 2022 +0800

    [fix][broker] fix can not revoke permission after update topic partition (#17393)
    
    * add unittest for revoke permission of topic after update topic partition
    
    * fix revoke permission of partition
    
    Co-authored-by: fanjianye <fa...@bigo.sg>
    (cherry picked from commit 8c4aad5d2d5811f5eef220692a62d38a70d52988)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 14 +++++++++-----
 .../server/ProxyWithJwtAuthorizationTest.java      | 22 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f0f8e6a17b1..dbcb44a5f38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -320,7 +320,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
+    private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role, boolean force) {
         return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
                 policiesOptional -> {
                     Policies policies = policiesOptional.orElseThrow(() ->
@@ -329,8 +329,12 @@ public class PersistentTopicsBase extends AdminResource {
                             || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
                         log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
                                 clientAppId(), role, topicUri);
-                        return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
-                                "Permissions are not set at the topic level"));
+                        if (force) {
+                            return CompletableFuture.completedFuture(null);
+                        } else {
+                            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                                    "Permissions are not set at the topic level"));
+                        }
                     }
                     // Write the new policies to metadata store
                     return namespaceResources().setPoliciesAsync(namespaceName, p -> {
@@ -356,10 +360,10 @@ public class PersistentTopicsBase extends AdminResource {
                         for (int i = 0; i < numPartitions; i++) {
                             TopicName topicNamePartition = topicName.getPartition(i);
                             future = future.thenComposeAsync(unused ->
-                                    revokePermissionsAsync(topicNamePartition.toString(), role));
+                                    revokePermissionsAsync(topicNamePartition.toString(), role, true));
                         }
                     }
-                    return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
+                    return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role, false))
                             .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
                 }))
             ).exceptionally(ex -> {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index ff8f45d50f8..07b71530b53 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -209,6 +209,8 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
      * 2. Update the topic partition number to 4.
      * 3. Use new producer/consumer with client role to process the topic.
      * 4. Broker should authorize producer/consumer normally.
+     * 5. revoke produce/consumer permission of topic
+     * 6. new producer/consumer should not be authorized
      * </pre>
      */
     @Test
@@ -290,6 +292,26 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
         Assert.assertEquals(messageSet, receivedMessageSet);
         consumer.close();
         producer.close();
+
+        // revoke produce/consume permission
+        admin.topics().revokePermissions(topicName, CLIENT_ROLE);
+
+        // produce/consume the topic should fail
+        try {
+            consumer = proxyClient.newConsumer()
+                    .topic(topicName)
+                    .subscriptionName(subscriptionName).subscribe();
+            Assert.fail("Should not pass");
+        } catch (PulsarClientException.AuthorizationException ex) {
+            // ok
+        }
+        try {
+            producer = proxyClient.newProducer(Schema.BYTES)
+                    .topic(topicName).create();
+            Assert.fail("Should not pass");
+        } catch (PulsarClientException.AuthorizationException ex) {
+            // ok
+        }
         log.info("-- Exiting {} test --", methodName);
     }