You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/10 15:16:59 UTC

[pulsar] 02/12: [fix][authorization] Fix multiple roles authorization (#16645)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0cc42d9130d6245a3b278ff183cb72be4c1a4927
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jul 25 17:57:19 2022 +0800

    [fix][authorization] Fix multiple roles authorization (#16645)
    
    (cherry picked from commit d8483d48cb21e8e99fd56c786e5198f7fe7135f6)
---
 .../MultiRolesTokenAuthorizationProvider.java      |  86 ++++++--
 .../authorization/PulsarAuthorizationProvider.java |   3 +-
 .../MultiRolesTokenAuthorizationProviderTest.java  | 231 +++++++++++++++++++++
 3 files changed, 307 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index b8f46a52483..d72c951c889 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -26,9 +26,12 @@ import io.jsonwebtoken.RequiredTypeException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -38,9 +41,12 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,56 +85,112 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
         super.initialize(conf, pulsarResources);
     }
 
-    private List<String> getRoles(AuthenticationDataSource authData) {
+    @Override
+    public CompletableFuture<Boolean> isSuperUser(String role, AuthenticationDataSource authenticationData,
+                                                  ServiceConfiguration serviceConfiguration) {
+        Set<String> roles = getRoles(authenticationData);
+        if (roles.isEmpty()) {
+            return CompletableFuture.completedFuture(false);
+        }
+        Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
+        if (superUserRoles.isEmpty()) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        return CompletableFuture.completedFuture(roles.stream().anyMatch(superUserRoles::contains));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, String role,
+                                                                AuthenticationDataSource authData) {
+        return isSuperUser(role, authData, conf)
+                .thenCompose(isSuperUser -> {
+                    if (isSuperUser) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    Set<String> roles = getRoles(authData);
+                    if (roles.isEmpty()) {
+                        return CompletableFuture.completedFuture(false);
+                    }
+
+                    return pulsarResources.getTenantResources()
+                            .getTenantAsync(tenantName)
+                            .thenCompose(op -> {
+                                if (op.isPresent()) {
+                                    TenantInfo tenantInfo = op.get();
+                                    if (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()) {
+                                        return CompletableFuture.completedFuture(false);
+                                    }
+
+                                    return CompletableFuture.completedFuture(roles.stream()
+                                            .anyMatch(n -> tenantInfo.getAdminRoles().contains(n)));
+                                } else {
+                                    throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+                                }
+                            }).exceptionally(ex -> {
+                                Throwable cause = ex.getCause();
+                                if (cause instanceof MetadataStoreException.NotFoundException) {
+                                    log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
+                                    throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+                                }
+                                log.error("Failed to get tenant {}", tenantName, cause);
+                                throw new RestException(cause);
+                            });
+                });
+    }
+
+    private Set<String> getRoles(AuthenticationDataSource authData) {
         String token = null;
 
         if (authData.hasDataFromCommand()) {
             // Authenticate Pulsar binary connection
             token = authData.getCommandData();
             if (StringUtils.isBlank(token)) {
-                return Collections.emptyList();
+                return Collections.emptySet();
             }
         } else if (authData.hasDataFromHttp()) {
             // The format here should be compliant to RFC-6750
             // (https://tools.ietf.org/html/rfc6750#section-2.1). Eg: Authorization: Bearer xxxxxxxxxxxxx
             String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);
             if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
-                return Collections.emptyList();
+                return Collections.emptySet();
             }
 
             // Remove prefix
             token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length());
         }
 
-        if (token == null)
-            return Collections.emptyList();
+        if (token == null) {
+            return Collections.emptySet();
+        }
 
         String[] splitToken = token.split("\\.");
         if (splitToken.length < 2) {
             log.warn("Unable to extract additional roles from JWT token");
-            return Collections.emptyList();
+            return Collections.emptySet();
         }
         String unsignedToken = splitToken[0] + "." + splitToken[1] + ".";
 
         Jwt<?, Claims> jwt = parser.parseClaimsJwt(unsignedToken);
         try {
-            return Collections.singletonList(jwt.getBody().get(roleClaim, String.class));
+            return new HashSet<>(Collections.singletonList(jwt.getBody().get(roleClaim, String.class)));
         } catch (RequiredTypeException requiredTypeException) {
             try {
                 List list = jwt.getBody().get(roleClaim, List.class);
                 if (list != null) {
-                    return list;
+                    return new HashSet<String>(list);
                 }
             } catch (RequiredTypeException requiredTypeException1) {
-                return Collections.emptyList();
+                return Collections.emptySet();
             }
         }
 
-        return Collections.emptyList();
+        return Collections.emptySet();
     }
 
-    public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String, CompletableFuture<Boolean>> authorizeFunc) {
-        List<String> roles = getRoles(authenticationData);
+    public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String,
+            CompletableFuture<Boolean>> authorizeFunc) {
+        Set<String> roles = getRoles(authenticationData);
         if (roles.isEmpty()) {
             return CompletableFuture.completedFuture(false);
         }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 097464bfb5f..b753d2ed634 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -59,7 +59,8 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
     private static final Logger log = LoggerFactory.getLogger(PulsarAuthorizationProvider.class);
 
     public ServiceConfiguration conf;
-    private PulsarResources pulsarResources;
+
+    protected PulsarResources pulsarResources;
 
 
     public PulsarAuthorizationProvider() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
new file mode 100644
index 00000000000..12d7c71358b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.MultiRolesTokenAuthorizationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiRolesTokenAuthorizationProviderTest extends MockedPulsarServiceBaseTest {
+
+    private final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private final String superUserToken;
+    private final String normalUserToken;
+
+    public MultiRolesTokenAuthorizationProviderTest() {
+        Map<String, Object> claims = new HashMap<>();
+        Set<String> roles = new HashSet<>();
+        roles.add("user1");
+        roles.add("superUser");
+        claims.put("roles", roles);
+        superUserToken = Jwts.builder()
+                .setClaims(claims)
+                .signWith(secretKey)
+                .compact();
+
+        roles = new HashSet<>();
+        roles.add("normalUser");
+        roles.add("user2");
+        roles.add("user5");
+        claims.put("roles", roles);
+        normalUserToken = Jwts.builder()
+                .setClaims(claims)
+                .signWith(secretKey)
+                .compact();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey",
+                "data:;base64," + Base64.getEncoder().encodeToString(secretKey.getEncoded()));
+        properties.setProperty("tokenAuthClaim", "roles");
+        conf.setProperties(properties);
+
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters(superUserToken);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+        conf.setAuthorizationProvider(MultiRolesTokenAuthorizationProvider.class.getName());
+
+        conf.setClusterName(configClusterName);
+        conf.setNumExecutorThreadPoolSize(5);
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster(configClusterName,
+                ClusterData.builder()
+                        .brokerServiceUrl(brokerUrl.toString())
+                        .serviceUrl(getPulsar().getWebServiceAddress())
+                        .build()
+        );
+    }
+
+    @BeforeClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.authentication(new AuthenticationToken(superUserToken));
+    }
+
+    @Override
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+        pulsarAdminBuilder.authentication(new AuthenticationToken(superUserToken));
+    }
+
+    private PulsarAdmin newPulsarAdmin(String token) throws PulsarClientException {
+        return PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .requestTimeout(3, TimeUnit.SECONDS)
+                .build();
+    }
+
+    private PulsarClient newPulsarClient(String token) throws PulsarClientException {
+        return PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new AuthenticationToken(token))
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .build();
+    }
+
+    @Test
+    public void testAdminRequestWithSuperUserToken() throws Exception {
+        String tenant = "superuser-admin-tenant";
+        @Cleanup
+        PulsarAdmin admin = newPulsarAdmin(superUserToken);
+        admin.tenants().createTenant(tenant, TenantInfo.builder()
+                .allowedClusters(Sets.newHashSet(configClusterName)).build());
+        String namespace = "superuser-admin-namespace";
+        admin.namespaces().createNamespace(tenant + "/" + namespace);
+        admin.brokers().getAllDynamicConfigurations();
+        admin.tenants().getTenants();
+        admin.topics().getList(tenant + "/" + namespace);
+    }
+
+    @Test
+    public void testProduceAndConsumeWithSuperUserToken() throws Exception {
+        String tenant = "superuser-client-tenant";
+        @Cleanup
+        PulsarAdmin admin = newPulsarAdmin(superUserToken);
+        admin.tenants().createTenant(tenant, TenantInfo.builder()
+                .allowedClusters(Sets.newHashSet(configClusterName)).build());
+        String namespace = "superuser-client-namespace";
+        admin.namespaces().createNamespace(tenant + "/" + namespace);
+        String topic = tenant + "/" + namespace + "/" + "test-topic";
+
+        @Cleanup
+        PulsarClient client = newPulsarClient(superUserToken);
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer().topic(topic).create();
+        byte[] body = "hello".getBytes(StandardCharsets.UTF_8);
+        producer.send(body);
+
+        @Cleanup
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test")
+                .subscribe();
+        Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(message);
+        assertEquals(message.getData(), body);
+    }
+
+    @Test
+    public void testAdminRequestWithNormalUserToken() throws Exception {
+        String tenant = "normaluser-admin-tenant";
+        @Cleanup
+        PulsarAdmin admin = newPulsarAdmin(normalUserToken);
+
+        assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> admin.tenants().createTenant(tenant, TenantInfo.builder()
+                        .allowedClusters(Sets.newHashSet(configClusterName)).build()));
+    }
+
+    @Test
+    public void testProduceAndConsumeWithNormalUserToken() throws Exception {
+        String tenant = "normaluser-client-tenant";
+        @Cleanup
+        PulsarAdmin admin = newPulsarAdmin(superUserToken);
+        admin.tenants().createTenant(tenant, TenantInfo.builder()
+                .allowedClusters(Sets.newHashSet(configClusterName)).build());
+        String namespace = "normaluser-client-namespace";
+        admin.namespaces().createNamespace(tenant + "/" + namespace);
+        String topic = tenant + "/" + namespace + "/" + "test-topic";
+
+        @Cleanup
+        PulsarClient client = newPulsarClient(normalUserToken);
+        assertThrows(PulsarClientException.AuthorizationException.class, () -> {
+            @Cleanup
+            Producer<byte[]> ignored = client.newProducer().topic(topic).create();
+        });
+
+        assertThrows(PulsarClientException.AuthorizationException.class, () -> {
+            @Cleanup
+            Consumer<byte[]> ignored = client.newConsumer().topic(topic)
+                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscriptionName("test")
+                    .subscribe();
+        });
+    }
+}