You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/03/06 05:19:38 UTC

[pulsar] branch master updated: Enhance Authorization by adding TenantAdmin interface (#6487)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f33567e  Enhance Authorization by adding TenantAdmin interface (#6487)
f33567e is described below

commit f33567ec6b805a7ae704520ab93ff87b0abb02b6
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Mar 5 21:19:26 2020 -0800

    Enhance Authorization by adding TenantAdmin interface (#6487)
    
    * Enhance Authorization by adding TenantAdmin interface
    
    * Remove debugging comment
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../authorization/AuthorizationProvider.java       | 13 +++++++++++
 .../broker/authorization/AuthorizationService.java |  9 ++++++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../pulsar/broker/web/PulsarWebResource.java       | 26 ++++++++++------------
 .../apache/pulsar/broker/admin/NamespacesTest.java |  2 ++
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |  5 +++++
 .../discovery/service/BrokerDiscoveryProvider.java |  2 +-
 .../functions/worker/rest/api/ComponentImpl.java   |  4 ++--
 .../worker/rest/api/FunctionsImplTest.java         | 19 +++++++++++-----
 .../proxy/server/BrokerDiscoveryProvider.java      |  2 +-
 10 files changed, 59 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 9787eae..cb65416 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 
 /**
  * Provider of authorization mechanism
@@ -47,6 +48,18 @@ public interface AuthorizationProvider extends Closeable {
     }
 
     /**
+     * Check if specified role is an admin of the tenant
+     * @param tenant the tenant to check
+     * @param role the role to check
+     * @return a CompletableFuture containing a boolean in which true means the role is an admin user
+     * and false if it is not
+     */
+    default CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
+                                                     AuthenticationDataSource authenticationData) {
+        return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role) ? true : false);
+    }
+
+    /**
      * Perform initialization for the authorization provider
      *
      * @param conf
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 95ff764..381e8cf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +77,14 @@ public class AuthorizationService {
         return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
     }
 
+    public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
+                                                    AuthenticationDataSource authenticationData) {
+        if (provider != null) {
+            return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData);
+        }
+        return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
+    }
+
     /**
      *
      * Grant authorization-action permission on a namespace to the given client
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 503b21b..e37f09d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1797,7 +1797,7 @@ public class PersistentTopicsBase extends AdminResource {
                 checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
             } catch (RestException e) {
                 try {
-                    validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant());
+                    validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
                 } catch (RestException authException) {
                     log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
                     throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index ca9ec85..497883d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -52,6 +52,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -224,7 +226,7 @@ public abstract class PulsarWebResource {
      */
     protected void validateAdminAccessForTenant(String tenant) {
         try {
-            validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant);
+            validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData());
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
@@ -234,7 +236,8 @@ public abstract class PulsarWebResource {
     }
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
-                                                       String originalPrincipal, String tenant)
+                                                       String originalPrincipal, String tenant,
+                                                       AuthenticationDataSource authenticationData)
             throws RestException, Exception {
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
@@ -259,22 +262,17 @@ public abstract class PulsarWebResource {
             validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, originalPrincipal);
 
             if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-
                 CompletableFuture<Boolean> isProxySuperUserFuture;
                 CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
                 try {
-                    isProxySuperUserFuture = pulsar.getBrokerService()
-                            .getAuthorizationService()
-                            .isSuperUser(clientAppId);
+                    AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
+                    isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId);
 
-                    isOriginalPrincipalSuperUserFuture = pulsar.getBrokerService()
-                            .getAuthorizationService()
-                            .isSuperUser(originalPrincipal);
+                    isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal);
 
-                Set<String> adminRoles = tenantInfo.getAdminRoles();
-                boolean proxyAuthorized = isProxySuperUserFuture.get() || adminRoles.contains(clientAppId);
-                boolean originalPrincipalAuthorized
-                    = isOriginalPrincipalSuperUserFuture.get() || adminRoles.contains(originalPrincipal);
+                    boolean proxyAuthorized = isProxySuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get();
+                    boolean originalPrincipalAuthorized
+                    = isOriginalPrincipalSuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo, authenticationData).get();
                     if (!proxyAuthorized || !originalPrincipalAuthorized) {
                         throw new RestException(Status.UNAUTHORIZED,
                                 String.format("Proxy not authorized to access resource (proxy:%s,original:%s)",
@@ -290,7 +288,7 @@ public abstract class PulsarWebResource {
                         .getAuthorizationService()
                         .isSuperUser(clientAppId)
                         .join()) {
-                    if (!tenantInfo.getAdminRoles().contains(clientAppId)) {
+                    if (!pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) {
                         throw new RestException(Status.UNAUTHORIZED,
                                 "Don't have permission to administrate resources on this tenant");
                     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index a607921..0f7d100 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -145,6 +145,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(false).when(namespaces).isRequestHttps();
         doReturn("test").when(namespaces).clientAppId();
         doReturn(null).when(namespaces).originalPrincipal();
+        doReturn(null).when(namespaces).clientAuthData();
         doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
         doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant);
         doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant");
@@ -987,6 +988,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(false).when(topics).isRequestHttps();
         doReturn("test").when(topics).clientAppId();
         doReturn(null).when(topics).originalPrincipal();
+        doReturn(null).when(topics).clientAuthData();
         mockWebUrl(localWebServiceUrl, testNs);
         doReturn("persistent").when(topics).domain();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 8b7e92e..ed7a0dd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -42,6 +42,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -109,13 +111,16 @@ public class PulsarFunctionTlsTest {
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         config.setAuthenticationEnabled(true);
+        config.setAuthorizationEnabled(true);
         config.setAuthenticationProviders(providers);
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config.setTlsAllowInsecureConnection(true);
         functionsWorkerService = spy(createPulsarFunctionWorker(config));
         AuthenticationService authenticationService = new AuthenticationService(config);
+        AuthorizationService authorizationService = new AuthorizationService(config, mock(ConfigurationCacheService.class));
         when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
+        when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService);
         when(functionsWorkerService.isInitialized()).thenReturn(true);
 
         PulsarAdmin admin = mock(PulsarAdmin.class);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 8b3a012..de3fef1 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -155,7 +155,7 @@ public class BrokerDiscoveryProvider implements Closeable {
                 throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
                         topicName.getTenant(), e.getMessage()));
             }
-            if (!tenantInfo.getAdminRoles().contains(role)) {
+            if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
                 throw new IllegalAccessException("Don't have permission to administrate resources on this property");
             }
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 98cc3a5..cab2b07 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1449,10 +1449,10 @@ public abstract class ComponentImpl {
             if (clientRole != null) {
                 try {
                     TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-                    if (tenantInfo != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(clientRole)) {
+                    if (tenantInfo != null && worker().getAuthorizationService().isTenantAdmin(tenant, clientRole, tenantInfo, authenticationData).get()) {
                         return true;
                     }
-                } catch (PulsarAdminException.NotFoundException e) {
+                } catch (PulsarAdminException.NotFoundException | InterruptedException | ExecutionException e) {
 
                 }
             }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 258c1237..cd55b2a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker.rest.api;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -60,6 +61,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
@@ -230,51 +232,56 @@ public class FunctionsImplTest {
     }
 
     @Test
-    public void testIsAuthorizedRole() throws PulsarAdminException {
-
+    public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedException, ExecutionException {
 
+        TenantInfo tenantInfo = new TenantInfo();
+        AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
         FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
+        AuthorizationService authorizationService = mock(AuthorizationService.class);
+        doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService();
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setAuthorizationEnabled(true);
         workerConfig.setSuperUserRoles(Collections.singleton(superUser));
         doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
 
         // test super user
-        AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", superUser, authenticationDataSource));
 
         // test normal user
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
         doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
         Tenants tenants = mock(Tenants.class);
-        when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo());
+        when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
         PulsarAdmin admin = mock(PulsarAdmin.class);
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
+        when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
         assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));
 
         // if user is tenant admin
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
         doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
         tenants = mock(Tenants.class);
-        TenantInfo tenantInfo = new TenantInfo();
         tenantInfo.setAdminRoles(Collections.singleton("test-user"));
         when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
 
         admin = mock(PulsarAdmin.class);
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
+        when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));
 
         // test user allow function action
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
         doReturn(true).when(functionImpl).allowFunctionOps(any(), any(), any());
         tenants = mock(Tenants.class);
-        when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo());
+        tenantInfo.setAdminRoles(Collections.emptySet());
+        when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
 
         admin = mock(PulsarAdmin.class);
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
+        when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));
 
         // test role is null
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index 28f28cd..e055872 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -153,7 +153,7 @@ public class BrokerDiscoveryProvider implements Closeable {
                 throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
                         topicName.getTenant(), e.getMessage()));
             }
-            if (!tenantInfo.getAdminRoles().contains(role)) {
+            if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
                 throw new IllegalAccessException("Don't have permission to administrate resources on this tenant");
             }
         }