You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/06 18:07:53 UTC
[pulsar] 01/02: [Authorization] Support CLEAR_BACKLOG namespace op after enable auth (#12963)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bbe65290a64d34f33f22f66c4f8adf46b28359a2
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Fri Nov 26 18:42:31 2021 +0800
[Authorization] Support CLEAR_BACKLOG namespace op after enable auth (#12963)
(cherry picked from commit 64af8df83b7463d7e9231ddabc603705f15d30d6)
---
.../authorization/PulsarAuthorizationProvider.java | 1 +
.../api/AuthorizationProducerConsumerTest.java | 78 +++++++++++++++++++++-
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 411c253..7be0cac 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -547,6 +547,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
break;
case GET_TOPICS:
case UNSUBSCRIBE:
+ case CLEAR_BACKLOG:
isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData);
break;
default:
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 1af36f5..603a816 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -267,7 +267,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
Collections.singleton(AuthAction.consume));
// now, subscriptionRole have consume authorization on namespace, so it will successfully unsubscribe namespace
- superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2);
+ sub1Admin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2);
subscriptions = sub1Admin.topics().getSubscriptions(topicName);
assertEquals(subscriptions.size(), 1);
@@ -323,6 +323,82 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
}
@Test
+ public void testClearBacklogPermission() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ setup();
+
+ final String subscriptionRole = "sub-role";
+ final String subscriptionName = "sub1";
+ final String namespace = "my-property/my-ns-sub-auth";
+ final String topicName = "persistent://" + namespace + "/my-topic";
+ Authentication adminAuthentication = new ClientAuthentication("superUser");
+
+ clientAuthProviderSupportedRoles.add(subscriptionRole);
+
+ @Cleanup
+ PulsarAdmin superAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(adminAuthentication).build());
+
+ Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
+ @Cleanup
+ PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(subAdminAuthentication).build());
+
+ superAdmin.clusters().createCluster("test",
+ ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+ superAdmin.tenants().createTenant("my-property",
+ new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+ superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ superAdmin.topics().createPartitionedTopic(topicName, 1);
+
+ // grant topic consume&produce authorization to the subscriptionRole
+ superAdmin.topics().grantPermission(topicName, subscriptionRole,
+ Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+ replacePulsarClient(PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(subAdminAuthentication));
+
+ @Cleanup
+ Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName(subscriptionName)
+ .subscribe();
+
+ CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
+ for (int i = 0; i < 10; i++) {
+ completableFuture = batchProducer.sendAsync("a".getBytes());
+ }
+ completableFuture.get();
+ assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions()
+ .get(subscriptionName).getMsgBacklog(), 10);
+
+ // subscriptionRole doesn't have namespace-level authorization, so it will fail to clear backlog
+ try {
+ sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff");
+ fail("should have failed with authorization exception");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith(
+ "Unauthorized to validateNamespaceOperation for operation [CLEAR_BACKLOG]"));
+ }
+
+ superAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
+ Sets.newHashSet(AuthAction.consume));
+ // now, subscriptionRole have consume authorization on namespace, so it will successfully clear backlog
+ sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff");
+ assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions()
+ .get(subscriptionName).getMsgBacklog(), 0);
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);