You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/01/23 12:37:31 UTC

[pulsar] branch master updated: [Functions] Fixes function worker get superuser role (#9259)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d3f8440  [Functions] Fixes function worker get superuser role (#9259)
d3f8440 is described below

commit d3f8440e06691736e686a76127680779b9dabfa8
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Sat Jan 23 20:37:00 2021 +0800

    [Functions] Fixes function worker get superuser role (#9259)
    
    * Fixes function worker get superuser role
    ---
    
    Fixes #7879
    
    *Motivation*
    
    Function worker should use authorization service to check
    a role if a superuser.
    
    *Modifications*
    
    - Fix the isSuperuser method in the function
    
    *Verify this change*
    
    Please pick either of following options.
    
    - Adjust the original test case to verify it
    
    * Fix the tests
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 36 +++++++++++++++++-----
 .../functions/worker/rest/api/ComponentImpl.java   | 23 ++++++++++++--
 .../worker/rest/api/FunctionsImplTest.java         | 23 +++++++++++---
 3 files changed, 68 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 6ccd74f..5f9c249 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -22,12 +22,17 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -54,6 +59,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -67,6 +73,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.client.admin.BrokerStats;
@@ -1515,15 +1522,30 @@ public class PulsarFunctionE2ETest {
         propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
 
-        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader()
+            .getResource("pulsar-functions-api-examples.jar").getFile();
         FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                 "my.*", sinkTopic, subscriptionName);
-        try {
-            admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
-            assertTrue(validRoleName);
-        } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
-            assertFalse(validRoleName);
+        if (!validRoleName) {
+            // create a non-superuser admin to test the api
+            admin = spy(
+                PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddressTls())
+                    .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                    .allowTlsInsecureConnection(true).build());
+            try {
+                admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+            } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
+                assertFalse(validRoleName);
+            }
+        } else {
+            try {
+                admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+                assertTrue(validRoleName);
+            } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
+                fail();
+            }
         }
+
     }
 
     @Test(timeOut = 20000)
@@ -1815,4 +1837,4 @@ public class PulsarFunctionE2ETest {
         double value;
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index fdae616..3a35628 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -98,6 +98,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -1530,9 +1531,25 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
     }
 
     public boolean isSuperUser(String clientRole) {
-        return clientRole != null
-                && worker().getWorkerConfig().getSuperUserRoles() != null
-                && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+        if (clientRole != null) {
+            try {
+                if ((worker().getWorkerConfig().getSuperUserRoles() != null
+                    && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) {
+                    return true;
+                }
+                return worker().getAuthorizationService().isSuperUser(clientRole, null)
+                    .get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+            } catch (InterruptedException e) {
+                log.warn("Time-out {} sec while checking the role {} is a super user role ",
+                    worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), clientRole);
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+            } catch (Exception e) {
+                log.warn("Admin-client with Role - failed to check the role {} is a super user role {} ", clientRole,
+                    e.getMessage(), e);
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+            }
+        }
+        return false;
     }
 
     public boolean allowFunctionOps(NamespaceName namespaceName, String role,
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 4d2867f..beb0cf9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -64,6 +63,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.eq;
@@ -248,6 +248,12 @@ public class FunctionsImplTest {
         // test super user
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", superUser, authenticationDataSource));
 
+        // test pulsar super user
+        final String pulsarSuperUser = "pulsarSuperUser";
+        when(authorizationService.isSuperUser(pulsarSuperUser, null)).thenReturn(CompletableFuture.completedFuture(true));
+        assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", pulsarSuperUser, authenticationDataSource));
+        assertTrue(functionImpl.isSuperUser(pulsarSuperUser));
+
         // test normal user
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
         doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
@@ -257,6 +263,7 @@ public class FunctionsImplTest {
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
         when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
+        when(authorizationService.isSuperUser("test-user", null)).thenReturn(CompletableFuture.completedFuture(false));
         assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));
 
         // if user is tenant admin
@@ -270,6 +277,7 @@ public class FunctionsImplTest {
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
         when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
+        when(authorizationService.isSuperUser("test-user", authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));
 
         // test user allow function action
@@ -301,10 +309,17 @@ public class FunctionsImplTest {
     public void testIsSuperUser() throws PulsarAdminException {
 
         FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
+        AuthorizationService authorizationService = mock(AuthorizationService.class);
+        doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService();
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setAuthorizationEnabled(true);
         workerConfig.setSuperUserRoles(Collections.singleton(superUser));
         doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
+        when(authorizationService.isSuperUser(anyString(), any()))
+            .thenAnswer((invocationOnMock) -> {
+                String role = invocationOnMock.getArgument(0, String.class);
+                return CompletableFuture.completedFuture(superUser.equals(role));
+            });
 
         AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
         assertTrue(functionImpl.isSuperUser(superUser));
@@ -312,13 +327,13 @@ public class FunctionsImplTest {
         assertFalse(functionImpl.isSuperUser("normal-user"));
         assertFalse(functionImpl.isSuperUser( null));
 
-        // test super roles is null
-
+        // test super roles is null and it's not a pulsar super user
+        when(authorizationService.isSuperUser(superUser, null))
+            .thenReturn(CompletableFuture.completedFuture(false));
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
         workerConfig = new WorkerConfig();
         workerConfig.setAuthorizationEnabled(true);
         doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
-
         assertFalse(functionImpl.isSuperUser(superUser));
     }