You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/10 15:16:59 UTC
[pulsar] 02/12: [fix][authorization] Fix multiple roles authorization (#16645)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0cc42d9130d6245a3b278ff183cb72be4c1a4927
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jul 25 17:57:19 2022 +0800
[fix][authorization] Fix multiple roles authorization (#16645)
(cherry picked from commit d8483d48cb21e8e99fd56c786e5198f7fe7135f6)
---
.../MultiRolesTokenAuthorizationProvider.java | 86 ++++++--
.../authorization/PulsarAuthorizationProvider.java | 3 +-
.../MultiRolesTokenAuthorizationProviderTest.java | 231 +++++++++++++++++++++
3 files changed, 307 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index b8f46a52483..d72c951c889 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -26,9 +26,12 @@ import io.jsonwebtoken.RequiredTypeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -38,9 +41,12 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,56 +85,112 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
super.initialize(conf, pulsarResources);
}
- private List<String> getRoles(AuthenticationDataSource authData) {
+ @Override
+ public CompletableFuture<Boolean> isSuperUser(String role, AuthenticationDataSource authenticationData,
+ ServiceConfiguration serviceConfiguration) {
+ Set<String> roles = getRoles(authenticationData);
+ if (roles.isEmpty()) {
+ return CompletableFuture.completedFuture(false);
+ }
+ Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
+ if (superUserRoles.isEmpty()) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ return CompletableFuture.completedFuture(roles.stream().anyMatch(superUserRoles::contains));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, String role,
+ AuthenticationDataSource authData) {
+ return isSuperUser(role, authData, conf)
+ .thenCompose(isSuperUser -> {
+ if (isSuperUser) {
+ return CompletableFuture.completedFuture(true);
+ }
+ Set<String> roles = getRoles(authData);
+ if (roles.isEmpty()) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ return pulsarResources.getTenantResources()
+ .getTenantAsync(tenantName)
+ .thenCompose(op -> {
+ if (op.isPresent()) {
+ TenantInfo tenantInfo = op.get();
+ if (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ return CompletableFuture.completedFuture(roles.stream()
+ .anyMatch(n -> tenantInfo.getAdminRoles().contains(n)));
+ } else {
+ throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+ }
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ if (cause instanceof MetadataStoreException.NotFoundException) {
+ log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
+ throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+ }
+ log.error("Failed to get tenant {}", tenantName, cause);
+ throw new RestException(cause);
+ });
+ });
+ }
+
+ private Set<String> getRoles(AuthenticationDataSource authData) {
String token = null;
if (authData.hasDataFromCommand()) {
// Authenticate Pulsar binary connection
token = authData.getCommandData();
if (StringUtils.isBlank(token)) {
- return Collections.emptyList();
+ return Collections.emptySet();
}
} else if (authData.hasDataFromHttp()) {
// The format here should be compliant to RFC-6750
// (https://tools.ietf.org/html/rfc6750#section-2.1). Eg: Authorization: Bearer xxxxxxxxxxxxx
String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);
if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
- return Collections.emptyList();
+ return Collections.emptySet();
}
// Remove prefix
token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length());
}
- if (token == null)
- return Collections.emptyList();
+ if (token == null) {
+ return Collections.emptySet();
+ }
String[] splitToken = token.split("\\.");
if (splitToken.length < 2) {
log.warn("Unable to extract additional roles from JWT token");
- return Collections.emptyList();
+ return Collections.emptySet();
}
String unsignedToken = splitToken[0] + "." + splitToken[1] + ".";
Jwt<?, Claims> jwt = parser.parseClaimsJwt(unsignedToken);
try {
- return Collections.singletonList(jwt.getBody().get(roleClaim, String.class));
+ return new HashSet<>(Collections.singletonList(jwt.getBody().get(roleClaim, String.class)));
} catch (RequiredTypeException requiredTypeException) {
try {
List list = jwt.getBody().get(roleClaim, List.class);
if (list != null) {
- return list;
+ return new HashSet<String>(list);
}
} catch (RequiredTypeException requiredTypeException1) {
- return Collections.emptyList();
+ return Collections.emptySet();
}
}
- return Collections.emptyList();
+ return Collections.emptySet();
}
- public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String, CompletableFuture<Boolean>> authorizeFunc) {
- List<String> roles = getRoles(authenticationData);
+ public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String,
+ CompletableFuture<Boolean>> authorizeFunc) {
+ Set<String> roles = getRoles(authenticationData);
if (roles.isEmpty()) {
return CompletableFuture.completedFuture(false);
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 097464bfb5f..b753d2ed634 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -59,7 +59,8 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
private static final Logger log = LoggerFactory.getLogger(PulsarAuthorizationProvider.class);
public ServiceConfiguration conf;
- private PulsarResources pulsarResources;
+
+ protected PulsarResources pulsarResources;
public PulsarAuthorizationProvider() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
new file mode 100644
index 00000000000..12d7c71358b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.MultiRolesTokenAuthorizationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiRolesTokenAuthorizationProviderTest extends MockedPulsarServiceBaseTest {
+
+ private final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ private final String superUserToken;
+ private final String normalUserToken;
+
+ public MultiRolesTokenAuthorizationProviderTest() {
+ Map<String, Object> claims = new HashMap<>();
+ Set<String> roles = new HashSet<>();
+ roles.add("user1");
+ roles.add("superUser");
+ claims.put("roles", roles);
+ superUserToken = Jwts.builder()
+ .setClaims(claims)
+ .signWith(secretKey)
+ .compact();
+
+ roles = new HashSet<>();
+ roles.add("normalUser");
+ roles.add("user2");
+ roles.add("user5");
+ claims.put("roles", roles);
+ normalUserToken = Jwts.builder()
+ .setClaims(claims)
+ .signWith(secretKey)
+ .compact();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("superUser");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Properties properties = new Properties();
+ properties.setProperty("tokenSecretKey",
+ "data:;base64," + Base64.getEncoder().encodeToString(secretKey.getEncoded()));
+ properties.setProperty("tokenAuthClaim", "roles");
+ conf.setProperties(properties);
+
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters(superUserToken);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ conf.setAuthenticationProviders(providers);
+ conf.setAuthorizationProvider(MultiRolesTokenAuthorizationProvider.class.getName());
+
+ conf.setClusterName(configClusterName);
+ conf.setNumExecutorThreadPoolSize(5);
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+
+ admin.clusters().createCluster(configClusterName,
+ ClusterData.builder()
+ .brokerServiceUrl(brokerUrl.toString())
+ .serviceUrl(getPulsar().getWebServiceAddress())
+ .build()
+ );
+ }
+
+ @BeforeClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+ clientBuilder.authentication(new AuthenticationToken(superUserToken));
+ }
+
+ @Override
+ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+ pulsarAdminBuilder.authentication(new AuthenticationToken(superUserToken));
+ }
+
+ private PulsarAdmin newPulsarAdmin(String token) throws PulsarClientException {
+ return PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .requestTimeout(3, TimeUnit.SECONDS)
+ .build();
+ }
+
+ private PulsarClient newPulsarClient(String token) throws PulsarClientException {
+ return PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(new AuthenticationToken(token))
+ .operationTimeout(3, TimeUnit.SECONDS)
+ .build();
+ }
+
+ @Test
+ public void testAdminRequestWithSuperUserToken() throws Exception {
+ String tenant = "superuser-admin-tenant";
+ @Cleanup
+ PulsarAdmin admin = newPulsarAdmin(superUserToken);
+ admin.tenants().createTenant(tenant, TenantInfo.builder()
+ .allowedClusters(Sets.newHashSet(configClusterName)).build());
+ String namespace = "superuser-admin-namespace";
+ admin.namespaces().createNamespace(tenant + "/" + namespace);
+ admin.brokers().getAllDynamicConfigurations();
+ admin.tenants().getTenants();
+ admin.topics().getList(tenant + "/" + namespace);
+ }
+
+ @Test
+ public void testProduceAndConsumeWithSuperUserToken() throws Exception {
+ String tenant = "superuser-client-tenant";
+ @Cleanup
+ PulsarAdmin admin = newPulsarAdmin(superUserToken);
+ admin.tenants().createTenant(tenant, TenantInfo.builder()
+ .allowedClusters(Sets.newHashSet(configClusterName)).build());
+ String namespace = "superuser-client-namespace";
+ admin.namespaces().createNamespace(tenant + "/" + namespace);
+ String topic = tenant + "/" + namespace + "/" + "test-topic";
+
+ @Cleanup
+ PulsarClient client = newPulsarClient(superUserToken);
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer().topic(topic).create();
+ byte[] body = "hello".getBytes(StandardCharsets.UTF_8);
+ producer.send(body);
+
+ @Cleanup
+ Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .subscribe();
+ Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertEquals(message.getData(), body);
+ }
+
+ @Test
+ public void testAdminRequestWithNormalUserToken() throws Exception {
+ String tenant = "normaluser-admin-tenant";
+ @Cleanup
+ PulsarAdmin admin = newPulsarAdmin(normalUserToken);
+
+ assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> admin.tenants().createTenant(tenant, TenantInfo.builder()
+ .allowedClusters(Sets.newHashSet(configClusterName)).build()));
+ }
+
+ @Test
+ public void testProduceAndConsumeWithNormalUserToken() throws Exception {
+ String tenant = "normaluser-client-tenant";
+ @Cleanup
+ PulsarAdmin admin = newPulsarAdmin(superUserToken);
+ admin.tenants().createTenant(tenant, TenantInfo.builder()
+ .allowedClusters(Sets.newHashSet(configClusterName)).build());
+ String namespace = "normaluser-client-namespace";
+ admin.namespaces().createNamespace(tenant + "/" + namespace);
+ String topic = tenant + "/" + namespace + "/" + "test-topic";
+
+ @Cleanup
+ PulsarClient client = newPulsarClient(normalUserToken);
+ assertThrows(PulsarClientException.AuthorizationException.class, () -> {
+ @Cleanup
+ Producer<byte[]> ignored = client.newProducer().topic(topic).create();
+ });
+
+ assertThrows(PulsarClientException.AuthorizationException.class, () -> {
+ @Cleanup
+ Consumer<byte[]> ignored = client.newConsumer().topic(topic)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .subscribe();
+ });
+ }
+}