You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2024/03/04 03:17:08 UTC
(pulsar) branch branch-3.0 updated: [improve][broker] Add fine-grain authorization to retention admin API (#22163)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f0da29b6400 [improve][broker] Add fine-grain authorization to retention admin API (#22163)
f0da29b6400 is described below
commit f0da29b64007eaf21e83a5e9946558333035cfef
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Thu Feb 29 18:57:03 2024 +0800
[improve][broker] Add fine-grain authorization to retention admin API (#22163)
(cherry picked from commit 6ec473ed6458cf30e1fc7062057a50bfefada6cf)
---
.../pulsar/broker/admin/v2/PersistentTopics.java | 9 +-
.../broker/admin/TopicPoliciesAuthZTest.java | 174 +++++++++++++++++++++
.../pulsar/security/MockedPulsarStandalone.java | 155 ++++++++++++++++++
3 files changed, 335 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index eee363aeed8..15d23d6e4d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2406,7 +2406,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetRetention(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2433,7 +2434,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Retention policies for the specified topic") RetentionPolicies retention) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetRetention(retention, isGlobal))
.thenRun(() -> {
try {
@@ -2469,7 +2471,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveRetention(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove retention: namespace={}, topic={}",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
new file mode 100644
index 00000000000..9cf4c111ddd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.awaitility.Awaitility.await;
+import io.jsonwebtoken.Jwts;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
+
+ private PulsarAdmin superUserAdmin;
+
+ private PulsarAdmin tenantManagerAdmin;
+
+ private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString();
+ private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+ .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+ @SneakyThrows
+ @BeforeClass
+ public void before() {
+ configureTokenAuthentication();
+ configureDefaultAuthorization();
+ start();
+ this.superUserAdmin =PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+ .build();
+ final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public");
+ tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+ superUserAdmin.tenants().updateTenant("public", tenantInfo);
+ this.tenantManagerAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+ .build();
+ }
+
+
+ @SneakyThrows
+ @AfterClass
+ public void after() {
+ close();
+ }
+
+
+ @SneakyThrows
+ @Test
+ public void testRetention() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ final RetentionPolicies definedRetentionPolicy = new RetentionPolicies(1, 1);
+ // test superuser
+ superUserAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
+
+ // because the topic policies is eventual consistency, we should wait here
+ await().untilAsserted(() -> {
+ final RetentionPolicies receivedRetentionPolicy = superUserAdmin.topicPolicies().getRetention(topic);
+ Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
+ });
+ superUserAdmin.topicPolicies().removeRetention(topic);
+
+ await().untilAsserted(() -> {
+ final RetentionPolicies retention = superUserAdmin.topicPolicies().getRetention(topic);
+ Assert.assertNull(retention);
+ });
+
+ // test tenant manager
+
+ tenantManagerAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
+ await().untilAsserted(() -> {
+ final RetentionPolicies receivedRetentionPolicy = tenantManagerAdmin.topicPolicies().getRetention(topic);
+ Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
+ });
+ tenantManagerAdmin.topicPolicies().removeRetention(topic);
+ await().untilAsserted(() -> {
+ final RetentionPolicies retention = tenantManagerAdmin.topicPolicies().getRetention(topic);
+ Assert.assertNull(retention);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getRetention(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeRetention(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Set.of(action));
+ try {
+ subAdmin.topicPolicies().getRetention(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeRetention(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+ superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+ }
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
new file mode 100644
index 00000000000..20dd2e1066a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.security;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import javax.crypto.SecretKey;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+
+public abstract class MockedPulsarStandalone implements AutoCloseable {
+
+ @Getter
+ private final ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+ private PulsarTestContext pulsarTestContext;
+
+ @Getter
+ private PulsarService pulsarService;
+ private PulsarAdmin serviceInternalAdmin;
+
+
+ {
+ serviceConfiguration.setClusterName(TEST_CLUSTER_NAME);
+ serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
+ serviceConfiguration.setBrokerServicePort(Optional.of(0));
+ serviceConfiguration.setBrokerServicePortTls(Optional.of(0));
+ serviceConfiguration.setAdvertisedAddress("localhost");
+ serviceConfiguration.setWebServicePort(Optional.of(0));
+ serviceConfiguration.setWebServicePortTls(Optional.of(0));
+ serviceConfiguration.setNumExecutorThreadPoolSize(5);
+ serviceConfiguration.setExposeBundlesMetricsInPrometheus(true);
+ }
+
+
+ protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal";
+ private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
+ .claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
+ protected static final String SUPER_USER_SUBJECT = "super-user";
+ protected static final String SUPER_USER_TOKEN = Jwts.builder()
+ .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
+ protected static final String NOBODY_SUBJECT = "nobody";
+ protected static final String NOBODY_TOKEN = Jwts.builder()
+ .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
+
+
+ @SneakyThrows
+ protected void configureTokenAuthentication() {
+ serviceConfiguration.setAuthenticationEnabled(true);
+ serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+ // internal client
+ serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ final Map<String, String> brokerClientAuthParams = new HashMap<>();
+ brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
+ final String brokerClientAuthParamStr = MAPPER.writeValueAsString(brokerClientAuthParams);
+ serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
+
+ Properties properties = serviceConfiguration.getProperties();
+ if (properties == null) {
+ properties = new Properties();
+ serviceConfiguration.setProperties(properties);
+ }
+ properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+
+ }
+
+
+
+ protected void configureDefaultAuthorization() {
+ serviceConfiguration.setAuthorizationEnabled(true);
+ serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT));
+ }
+
+
+ @SneakyThrows
+ protected void start() {
+ this.pulsarTestContext = PulsarTestContext.builder()
+ .spyByDefault()
+ .config(serviceConfiguration)
+ .withMockZookeeper(false)
+ .build();
+ this.pulsarService = pulsarTestContext.getPulsarService();
+ this.serviceInternalAdmin = pulsarService.getAdminClient();
+ setupDefaultTenantAndNamespace();
+ }
+
+ private void setupDefaultTenantAndNamespace() throws Exception {
+ if (!serviceInternalAdmin.clusters().getClusters().contains(TEST_CLUSTER_NAME)) {
+ serviceInternalAdmin.clusters().createCluster(TEST_CLUSTER_NAME,
+ ClusterData.builder().serviceUrl(pulsarService.getWebServiceAddress()).build());
+ }
+ if (!serviceInternalAdmin.tenants().getTenants().contains(DEFAULT_TENANT)) {
+ serviceInternalAdmin.tenants().createTenant(DEFAULT_TENANT, TenantInfo.builder().allowedClusters(
+ Sets.newHashSet(TEST_CLUSTER_NAME)).build());
+ }
+ if (!serviceInternalAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) {
+ serviceInternalAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE);
+ }
+ }
+
+
+ @Override
+ public void close() throws Exception {
+ if (pulsarTestContext != null) {
+ pulsarTestContext.close();
+ }
+ }
+
+ // Utils
+ protected static final ObjectMapper mapper = new ObjectMapper();
+
+ // Static name
+ private static final String DEFAULT_TENANT = "public";
+ private static final String DEFAULT_NAMESPACE = "public/default";
+ private static final String TEST_CLUSTER_NAME = "test-standalone";
+
+ private static final ObjectMapper MAPPER = ObjectMapperFactory.getMapper().getObjectMapper();
+}