You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/01/23 12:37:31 UTC
[pulsar] branch master updated: [Functions] Fixes function worker
get superuser role (#9259)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d3f8440 [Functions] Fixes function worker get superuser role (#9259)
d3f8440 is described below
commit d3f8440e06691736e686a76127680779b9dabfa8
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Sat Jan 23 20:37:00 2021 +0800
[Functions] Fixes function worker get superuser role (#9259)
* Fixes function worker get superuser role
---
Fixes #7879
*Motivation*
Function worker should use authorization service to check
a role if a superuser.
*Modifications*
- Fix the isSuperuser method in the function
*Verify this change*
Please pick either of following options.
- Adjust the original test case to verify it
* Fix the tests
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 36 +++++++++++++++++-----
.../functions/worker/rest/api/ComponentImpl.java | 23 ++++++++++++--
.../worker/rest/api/FunctionsImplTest.java | 23 +++++++++++---
3 files changed, 68 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 6ccd74f..5f9c249 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -22,12 +22,17 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -54,6 +59,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -67,6 +73,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
@@ -1515,15 +1522,30 @@ public class PulsarFunctionE2ETest {
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);
- String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader()
+ .getResource("pulsar-functions-api-examples.jar").getFile();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
- try {
- admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
- assertTrue(validRoleName);
- } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
- assertFalse(validRoleName);
+ if (!validRoleName) {
+ // create a non-superuser admin to test the api
+ admin = spy(
+ PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddressTls())
+ .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+ .allowTlsInsecureConnection(true).build());
+ try {
+ admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+ } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
+ assertFalse(validRoleName);
+ }
+ } else {
+ try {
+ admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+ assertTrue(validRoleName);
+ } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
+ fail();
+ }
}
+
}
@Test(timeOut = 20000)
@@ -1815,4 +1837,4 @@ public class PulsarFunctionE2ETest {
double value;
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index fdae616..3a35628 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -98,6 +98,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -1530,9 +1531,25 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
}
public boolean isSuperUser(String clientRole) {
- return clientRole != null
- && worker().getWorkerConfig().getSuperUserRoles() != null
- && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+ if (clientRole != null) {
+ try {
+ if ((worker().getWorkerConfig().getSuperUserRoles() != null
+ && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) {
+ return true;
+ }
+ return worker().getAuthorizationService().isSuperUser(clientRole, null)
+ .get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Time-out {} sec while checking the role {} is a super user role ",
+ worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), clientRole);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ } catch (Exception e) {
+ log.warn("Admin-client with Role - failed to check the role {} is a super user role {} ", clientRole,
+ e.getMessage(), e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ }
+ return false;
}
public boolean allowFunctionOps(NamespaceName namespaceName, String role,
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 4d2867f..beb0cf9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -64,6 +63,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.eq;
@@ -248,6 +248,12 @@ public class FunctionsImplTest {
// test super user
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", superUser, authenticationDataSource));
+ // test pulsar super user
+ final String pulsarSuperUser = "pulsarSuperUser";
+ when(authorizationService.isSuperUser(pulsarSuperUser, null)).thenReturn(CompletableFuture.completedFuture(true));
+ assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", pulsarSuperUser, authenticationDataSource));
+ assertTrue(functionImpl.isSuperUser(pulsarSuperUser));
+
// test normal user
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
@@ -257,6 +263,7 @@ public class FunctionsImplTest {
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
+ when(authorizationService.isSuperUser("test-user", null)).thenReturn(CompletableFuture.completedFuture(false));
assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));
// if user is tenant admin
@@ -270,6 +277,7 @@ public class FunctionsImplTest {
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
+ when(authorizationService.isSuperUser("test-user", authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));
// test user allow function action
@@ -301,10 +309,17 @@ public class FunctionsImplTest {
public void testIsSuperUser() throws PulsarAdminException {
FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
+ AuthorizationService authorizationService = mock(AuthorizationService.class);
+ doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService();
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setAuthorizationEnabled(true);
workerConfig.setSuperUserRoles(Collections.singleton(superUser));
doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
+ when(authorizationService.isSuperUser(anyString(), any()))
+ .thenAnswer((invocationOnMock) -> {
+ String role = invocationOnMock.getArgument(0, String.class);
+ return CompletableFuture.completedFuture(superUser.equals(role));
+ });
AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
assertTrue(functionImpl.isSuperUser(superUser));
@@ -312,13 +327,13 @@ public class FunctionsImplTest {
assertFalse(functionImpl.isSuperUser("normal-user"));
assertFalse(functionImpl.isSuperUser( null));
- // test super roles is null
-
+ // test super roles is null and it's not a pulsar super user
+ when(authorizationService.isSuperUser(superUser, null))
+ .thenReturn(CompletableFuture.completedFuture(false));
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
workerConfig = new WorkerConfig();
workerConfig.setAuthorizationEnabled(true);
doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
-
assertFalse(functionImpl.isSuperUser(superUser));
}