You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/04/26 06:05:36 UTC

(pulsar) branch master updated: [improve][admin] Check if the topic existed before the permission operations (#22547)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69a600e86bb [improve][admin] Check if the topic existed before the permission operations (#22547)
69a600e86bb is described below

commit 69a600e86bb5110a118d836125411e941b83764d
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Apr 26 14:05:30 2024 +0800

    [improve][admin] Check if the topic existed before the permission operations (#22547)
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java      |  9 ++++++---
 .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java     |  1 +
 .../java/org/apache/pulsar/broker/admin/AdminApiTest.java   | 12 ++++++++++++
 .../apache/pulsar/broker/admin/PersistentTopicsTest.java    | 10 ++++++++--
 .../org/apache/pulsar/broker/auth/AuthorizationTest.java    | 13 ++++++++-----
 .../client/api/AuthenticatedProducerConsumerTest.java       |  4 +++-
 .../client/api/AuthorizationProducerConsumerTest.java       |  2 ++
 .../pulsar/websocket/proxy/ProxyAuthorizationTest.java      |  8 +++++---
 8 files changed, 45 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 63ea987bb07..682f41dcdb6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -205,6 +205,7 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         return validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> internalCheckTopicExists(topicName))
                 .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName));
     }
 
@@ -256,9 +257,10 @@ public class PersistentTopicsBase extends AdminResource {
                                                    Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenantAsync(namespaceName.getTenant())
-                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-                        grantPermissionsAsync(topicName, role, actions)
-                                .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))))
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> internalCheckTopicExists(topicName))
+                .thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions))
+                .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
                     Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                     log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
@@ -270,6 +272,7 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> internalCheckTopicExists(topicName))
                 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false)
                 .thenCompose(metadata -> {
                     int numPartitions = metadata.partitions;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index e89b4ff5e83..2dcb930fbe7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
                 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
                 .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN)
                 .build();
+        admin.topics().createNonPartitionedTopic(topicName);
         admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
         admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index b28cfc98fdb..635b2c25bc1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -3698,4 +3698,16 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         });
 
     }
+
+    @Test
+    @SneakyThrows
+    public void testPermissions() {
+        String namespace = "prop-xyz/ns1/";
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://" + namespace + random;
+        final String subject =  UUID.randomUUID().toString();
+        assertThrows(NotFoundException.class, () -> admin.topics().getPermissions(topic));
+        assertThrows(NotFoundException.class, () -> admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)));
+        assertThrows(NotFoundException.class, () -> admin.topics().revokePermissions(topic, subject));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index c588051a0fe..55b4c6e1c6f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -890,12 +890,15 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testGrantNonPartitionedTopic() {
         final String topicName = "non-partitioned-topic";
         AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
         response = mock(AsyncResponse.class);
-        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -957,12 +960,15 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testRevokeNonPartitionedTopic() {
         final String topicName = "non-partitioned-topic";
         AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
         response = mock(AsyncResponse.class);
-        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index f59f9d480b8..6c913d42908 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -58,6 +58,7 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
     public void setup() throws Exception {
         conf.setClusterName("c1");
         conf.setSystemTopicEnabled(false);
+        conf.setForceDeleteNamespaceAllowed(true);
         conf.setAuthenticationEnabled(true);
         conf.setForceDeleteNamespaceAllowed(true);
         conf.setForceDeleteTenantAllowed(true);
@@ -107,8 +108,9 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
         assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
         assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
 
-        admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
-                EnumSet.of(AuthAction.consume));
+        String topic = "persistent://p1/c1/ns1/ds2";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().grantPermission(topic, "other-role", EnumSet.of(AuthAction.consume));
         waitForChange();
 
         assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
@@ -178,8 +180,9 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
         assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "my.role.1", null));
         assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "my.role.2", null));
 
-        admin.topics().grantPermission("persistent://p1/c1/ns1/ds1", "my.*",
-                EnumSet.of(AuthAction.produce));
+        String topic1 = "persistent://p1/c1/ns1/ds1";
+        admin.topics().createNonPartitionedTopic(topic1);
+        admin.topics().grantPermission(topic1, "my.*", EnumSet.of(AuthAction.produce));
         waitForChange();
 
         assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null));
@@ -242,7 +245,7 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
         assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2"));
         assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1"));
 
-        admin.namespaces().deleteNamespace("p1/c1/ns1");
+        admin.namespaces().deleteNamespace("p1/c1/ns1", true);
         admin.tenants().deleteTenant("p1");
 
         admin.clusters().deleteCluster("c1");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index f9aa17ea3c4..c46f4744cd5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -263,7 +263,9 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         closeAdmin();
         admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build());
         admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
-        admin.topics().grantPermission("persistent://my-property/my-ns/my-topic", "anonymousUser",
+        String topic = "persistent://my-property/my-ns/my-topic";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().grantPermission(topic, "anonymousUser",
                 EnumSet.allOf(AuthAction.class));
 
         // setup the client
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 769486054ab..3ead51ad7fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -234,6 +234,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         }
 
         // grant topic consume authorization to the subscriptionRole
+        tenantAdmin.topics().createNonPartitionedTopic(topicName);
         tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
                 Collections.singleton(AuthAction.consume));
 
@@ -773,6 +774,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         admin.tenants().createTenant("my-property",
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
         admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+        admin.topics().createNonPartitionedTopic(topic);
         admin.topics().grantPermission(topic, invalidRole, Collections.singleton(AuthAction.produce));
         admin.topics().grantPermission(topic, producerRole, Sets.newHashSet(AuthAction.produce, AuthAction.consume));
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index d4f7c72bed0..2d00e15a13f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -55,6 +55,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
     @Override
     protected void setup() throws Exception {
         conf.setClusterName(configClusterName);
+        conf.setForceDeleteNamespaceAllowed(true);
         internalSetup();
 
         WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
@@ -99,8 +100,9 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
         assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
         assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
 
-        admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
-                EnumSet.of(AuthAction.consume));
+        String topic = "persistent://p1/c1/ns1/ds2";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().grantPermission(topic, "other-role", EnumSet.of(AuthAction.consume));
         waitForChange();
 
         assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
@@ -117,7 +119,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
         assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
         assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null));
 
-        admin.namespaces().deleteNamespace("p1/c1/ns1");
+        admin.namespaces().deleteNamespace("p1/c1/ns1", true);
         admin.tenants().deleteTenant("p1");
         admin.clusters().deleteCluster("c1");
     }