You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2024/03/04 03:17:08 UTC

(pulsar) branch branch-3.0 updated: [improve][broker] Add fine-grain authorization to retention admin API (#22163)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f0da29b6400 [improve][broker] Add fine-grain authorization to retention admin API (#22163)
f0da29b6400 is described below

commit f0da29b64007eaf21e83a5e9946558333035cfef
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Thu Feb 29 18:57:03 2024 +0800

    [improve][broker] Add fine-grain authorization to retention admin API (#22163)
    
    (cherry picked from commit 6ec473ed6458cf30e1fc7062057a50bfefada6cf)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   9 +-
 .../broker/admin/TopicPoliciesAuthZTest.java       | 174 +++++++++++++++++++++
 .../pulsar/security/MockedPulsarStandalone.java    | 155 ++++++++++++++++++
 3 files changed, 335 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index eee363aeed8..15d23d6e4d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2406,7 +2406,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalGetRetention(applied, isGlobal))
             .thenAccept(asyncResponse::resume)
             .exceptionally(ex -> {
@@ -2433,7 +2434,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Retention policies for the specified topic") RetentionPolicies retention) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
+            .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalSetRetention(retention, isGlobal))
             .thenRun(() -> {
                 try {
@@ -2469,7 +2471,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        preValidation(authoritative)
+        validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
             .thenCompose(__ -> internalRemoveRetention(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove retention: namespace={}, topic={}",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
new file mode 100644
index 00000000000..9cf4c111ddd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.awaitility.Awaitility.await;
+import io.jsonwebtoken.Jwts;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
+
+    private PulsarAdmin superUserAdmin;
+
+    private PulsarAdmin tenantManagerAdmin;
+
+    private static final String TENANT_ADMIN_SUBJECT =  UUID.randomUUID().toString();
+    private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+            .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+    @SneakyThrows
+    @BeforeClass
+    public void before() {
+        configureTokenAuthentication();
+        configureDefaultAuthorization();
+        start();
+        this.superUserAdmin =PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .build();
+        final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public");
+        tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+        superUserAdmin.tenants().updateTenant("public", tenantInfo);
+        this.tenantManagerAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+                .build();
+    }
+
+
+    @SneakyThrows
+    @AfterClass
+    public void after() {
+        close();
+    }
+
+
+    @SneakyThrows
+    @Test
+    public void testRetention() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        final RetentionPolicies definedRetentionPolicy = new RetentionPolicies(1, 1);
+        // test superuser
+        superUserAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
+
+        // because the topic policies is eventual consistency, we should wait here
+        await().untilAsserted(() -> {
+            final RetentionPolicies receivedRetentionPolicy = superUserAdmin.topicPolicies().getRetention(topic);
+             Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
+        });
+        superUserAdmin.topicPolicies().removeRetention(topic);
+
+        await().untilAsserted(() -> {
+            final RetentionPolicies retention = superUserAdmin.topicPolicies().getRetention(topic);
+            Assert.assertNull(retention);
+        });
+
+        // test tenant manager
+
+        tenantManagerAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
+        await().untilAsserted(() -> {
+            final RetentionPolicies receivedRetentionPolicy = tenantManagerAdmin.topicPolicies().getRetention(topic);
+            Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
+        });
+        tenantManagerAdmin.topicPolicies().removeRetention(topic);
+        await().untilAsserted(() -> {
+            final RetentionPolicies retention = tenantManagerAdmin.topicPolicies().getRetention(topic);
+            Assert.assertNull(retention);
+        });
+
+        // test nobody
+
+        try {
+            subAdmin.topicPolicies().getRetention(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+
+            subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().removeRetention(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+                    subject, Set.of(action));
+            try {
+                subAdmin.topicPolicies().getRetention(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+
+                subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                subAdmin.topicPolicies().removeRetention(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+            }
+            superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
new file mode 100644
index 00000000000..20dd2e1066a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.security;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import javax.crypto.SecretKey;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+
+public abstract class MockedPulsarStandalone implements AutoCloseable {
+
+    @Getter
+    private final ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+    private PulsarTestContext pulsarTestContext;
+
+    @Getter
+    private PulsarService pulsarService;
+    private PulsarAdmin serviceInternalAdmin;
+
+
+    {
+        serviceConfiguration.setClusterName(TEST_CLUSTER_NAME);
+        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
+        serviceConfiguration.setBrokerServicePort(Optional.of(0));
+        serviceConfiguration.setBrokerServicePortTls(Optional.of(0));
+        serviceConfiguration.setAdvertisedAddress("localhost");
+        serviceConfiguration.setWebServicePort(Optional.of(0));
+        serviceConfiguration.setWebServicePortTls(Optional.of(0));
+        serviceConfiguration.setNumExecutorThreadPoolSize(5);
+        serviceConfiguration.setExposeBundlesMetricsInPrometheus(true);
+    }
+
+
+    protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal";
+    private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
+            .claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
+    protected static final String SUPER_USER_SUBJECT = "super-user";
+    protected static final String SUPER_USER_TOKEN = Jwts.builder()
+            .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
+    protected static final String NOBODY_SUBJECT =  "nobody";
+    protected static final String NOBODY_TOKEN = Jwts.builder()
+            .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
+
+
+    @SneakyThrows
+    protected void configureTokenAuthentication() {
+        serviceConfiguration.setAuthenticationEnabled(true);
+        serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        // internal client
+        serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        final Map<String, String> brokerClientAuthParams = new HashMap<>();
+        brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
+        final String brokerClientAuthParamStr = MAPPER.writeValueAsString(brokerClientAuthParams);
+        serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
+
+        Properties properties = serviceConfiguration.getProperties();
+        if (properties == null) {
+            properties = new Properties();
+            serviceConfiguration.setProperties(properties);
+        }
+        properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+
+    }
+
+
+
+    protected void configureDefaultAuthorization() {
+        serviceConfiguration.setAuthorizationEnabled(true);
+        serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT));
+    }
+
+
+    @SneakyThrows
+    protected void start() {
+        this.pulsarTestContext = PulsarTestContext.builder()
+                .spyByDefault()
+                .config(serviceConfiguration)
+                .withMockZookeeper(false)
+                .build();
+        this.pulsarService = pulsarTestContext.getPulsarService();
+        this.serviceInternalAdmin = pulsarService.getAdminClient();
+        setupDefaultTenantAndNamespace();
+    }
+
+    private void setupDefaultTenantAndNamespace() throws Exception {
+        if (!serviceInternalAdmin.clusters().getClusters().contains(TEST_CLUSTER_NAME)) {
+            serviceInternalAdmin.clusters().createCluster(TEST_CLUSTER_NAME,
+                    ClusterData.builder().serviceUrl(pulsarService.getWebServiceAddress()).build());
+        }
+        if (!serviceInternalAdmin.tenants().getTenants().contains(DEFAULT_TENANT)) {
+            serviceInternalAdmin.tenants().createTenant(DEFAULT_TENANT, TenantInfo.builder().allowedClusters(
+                    Sets.newHashSet(TEST_CLUSTER_NAME)).build());
+        }
+        if (!serviceInternalAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) {
+            serviceInternalAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE);
+        }
+    }
+
+
+    @Override
+    public void close() throws Exception {
+        if (pulsarTestContext != null) {
+            pulsarTestContext.close();
+        }
+    }
+
+    // Utils
+    protected static final ObjectMapper mapper = new ObjectMapper();
+
+    // Static name
+    private static final String DEFAULT_TENANT = "public";
+    private static final String DEFAULT_NAMESPACE = "public/default";
+    private static final String TEST_CLUSTER_NAME = "test-standalone";
+
+    private static final ObjectMapper MAPPER = ObjectMapperFactory.getMapper().getObjectMapper();
+}