You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/06 18:07:53 UTC

[pulsar] 01/02: [Authorization] Support CLEAR_BACKLOG namespace op after enable auth (#12963)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bbe65290a64d34f33f22f66c4f8adf46b28359a2
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Fri Nov 26 18:42:31 2021 +0800

    [Authorization] Support CLEAR_BACKLOG namespace op after enable auth (#12963)
    
    (cherry picked from commit 64af8df83b7463d7e9231ddabc603705f15d30d6)
---
 .../authorization/PulsarAuthorizationProvider.java |  1 +
 .../api/AuthorizationProducerConsumerTest.java     | 78 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 411c253..7be0cac 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -547,6 +547,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
                 break;
             case GET_TOPICS:
             case UNSUBSCRIBE:
+            case CLEAR_BACKLOG:
                 isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData);
                 break;
             default:
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 1af36f5..603a816 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -267,7 +267,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
                 Collections.singleton(AuthAction.consume));
 
         // now, subscriptionRole have consume authorization on namespace, so it will successfully unsubscribe namespace
-        superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2);
+        sub1Admin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2);
         subscriptions = sub1Admin.topics().getSubscriptions(topicName);
         assertEquals(subscriptions.size(), 1);
 
@@ -323,6 +323,82 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     }
 
     @Test
+    public void testClearBacklogPermission() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        setup();
+
+        final String subscriptionRole = "sub-role";
+        final String subscriptionName = "sub1";
+        final String namespace = "my-property/my-ns-sub-auth";
+        final String topicName = "persistent://" + namespace + "/my-topic";
+        Authentication adminAuthentication = new ClientAuthentication("superUser");
+
+        clientAuthProviderSupportedRoles.add(subscriptionRole);
+
+        @Cleanup
+        PulsarAdmin superAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(adminAuthentication).build());
+
+        Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
+        @Cleanup
+        PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(subAdminAuthentication).build());
+
+        superAdmin.clusters().createCluster("test",
+                ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+        superAdmin.tenants().createTenant("my-property",
+                new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+        superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        superAdmin.topics().createPartitionedTopic(topicName, 1);
+
+        // grant topic consume&produce authorization to the subscriptionRole
+        superAdmin.topics().grantPermission(topicName, subscriptionRole,
+                Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+        replacePulsarClient(PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(subAdminAuthentication));
+
+        @Cleanup
+        Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName(subscriptionName)
+                .subscribe();
+
+        CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
+        for (int i = 0; i < 10; i++) {
+            completableFuture = batchProducer.sendAsync("a".getBytes());
+        }
+        completableFuture.get();
+        assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions()
+                .get(subscriptionName).getMsgBacklog(), 10);
+
+        // subscriptionRole doesn't have namespace-level authorization, so it will fail to clear backlog
+        try {
+            sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff");
+            fail("should have failed with authorization exception");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith(
+                    "Unauthorized to validateNamespaceOperation for operation [CLEAR_BACKLOG]"));
+        }
+
+        superAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
+                Sets.newHashSet(AuthAction.consume));
+        // now, subscriptionRole have consume authorization on namespace, so it will successfully clear backlog
+        sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff");
+        assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions()
+                .get(subscriptionName).getMsgBacklog(), 0);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
     public void testSubscriptionPrefixAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);