You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/01 17:54:14 UTC
[pulsar] branch master updated: Fix: Function auth should ignore
exception because it might be anonymous user (#4185)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e5b691b Fix: Function auth should ignore exception because it might be anonymous user (#4185)
e5b691b is described below
commit e5b691b77fa70f529e5bb0c8ae668478a6ca4d1b
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed May 1 10:54:08 2019 -0700
Fix: Function auth should ignore exception because it might be anonymous user (#4185)
* ignore exception because of anonymous user
* fix function authorization bug involving a tenant with no admin
* cleaning up comments
* add tests
* adding tests
---
.../worker/PulsarFunctionE2ESecurityTest.java | 259 +++++++++++++++++++++
.../auth/KubernetesSecretsTokenAuthProvider.java | 3 +-
2 files changed, 261 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 88bb83e..775a4bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -100,6 +101,7 @@ public class PulsarFunctionE2ESecurityTest {
private static final String SUBJECT = "my-test-subject";
private static final String ADMIN_SUBJECT = "superUser";
+ private static final String ANONYMOUS_ROLE = "anonymousUser";
private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
private String adminToken;
@@ -137,6 +139,7 @@ public class PulsarFunctionE2ESecurityTest {
config.setAuthenticationProviders(providers);
config.setAuthorizationEnabled(true);
config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ config.setAnonymousUserRole(ANONYMOUS_ROLE);
secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
Properties properties = new Properties();
properties.setProperty("tokenSecretKey",
@@ -261,6 +264,262 @@ public class PulsarFunctionE2ESecurityTest {
}
@Test
+ public void testAuthorizationWithAnonymousUser() throws Exception {
+
+ final String replNamespace = TENANT + "/" + NAMESPACE;
+ final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String functionName = "PulsarFunction-test";
+ final String subscriptionName = "test-sub";
+
+
+
+ try (PulsarAdmin admin1 = spy(
+ PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build())
+ ) {
+
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+
+ FunctionConfig functionConfig = createFunctionConfig(TENANT, NAMESPACE, functionName,
+ sourceTopic, sinkTopic, subscriptionName);
+
+ FunctionConfig functionConfig2 = createFunctionConfig(TENANT2, NAMESPACE, functionName,
+ sourceTopic, sinkTopic, subscriptionName);
+
+ // creating function should fail since admin1 doesn't have permissions granted yet
+ try {
+ admin1.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+ fail("client admin shouldn't have permissions to create function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+
+ // grant permissions to annoynmous role
+ Set<AuthAction> actions = new HashSet<>();
+ actions.add(AuthAction.functions);
+ actions.add(AuthAction.produce);
+ actions.add(AuthAction.consume);
+ superUserAdmin.namespaces().grantPermissionOnNamespace(replNamespace, ANONYMOUS_ROLE, actions);
+
+ // user should be able to create function now
+ admin1.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ // should still fail on a different namespace
+ try {
+ admin1.functions().createFunctionWithUrl(functionConfig2, jarFilePathUrl);
+ fail("client admin shouldn't have permissions to create function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+
+ retryStrategically((test) -> {
+ try {
+ return admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning() == 1
+ && admin1.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar sink consumer has started on the topic
+ assertEquals(admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning(), 1);
+ assertEquals(admin1.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ // create a producer that creates a topic at broker
+ try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe()) {
+
+ int totalMsgs = 5;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats = admin1.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedPropertyValue = msg.getProperty(propertyKey);
+ assertEquals(propertyValue, receivedPropertyValue);
+
+
+ // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked
+ // messages
+ // due to publish failure
+ assertNotEquals(admin1.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+ totalMsgs);
+
+ // test update functions
+ functionConfig.setParallelism(2);
+ functionConfig2.setParallelism(2);
+
+ try {
+ admin1.functions().updateFunctionWithUrl(functionConfig2, jarFilePathUrl);
+ fail("client admin shouldn't have permissions to update function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+
+ admin1.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning() == 2;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ assertEquals(admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning(), 2);
+
+ // test getFunctionInfo
+ try {
+ admin1.functions().getFunction(TENANT2, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to get function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunction(TENANT, NAMESPACE, functionName);
+
+ // test getFunctionInstanceStatus
+ try {
+ admin1.functions().getFunctionStatus(TENANT2, NAMESPACE, functionName, 0);
+ fail("client admin shouldn't have permissions to get function status");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName, 0);
+
+ // test getFunctionStatus
+ try {
+ admin1.functions().getFunctionStatus(TENANT2, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to get function status");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName);
+
+ // test getFunctionStats
+ try {
+ admin1.functions().getFunctionStats(TENANT2, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to get function stats");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctionStats(TENANT, NAMESPACE, functionName);
+
+ // test getFunctionInstanceStats
+ try {
+ admin1.functions().getFunctionStats(TENANT2, NAMESPACE, functionName, 0);
+ fail("client admin shouldn't have permissions to get function stats");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctionStats(TENANT, NAMESPACE, functionName, 0);
+
+ // test listFunctions
+ try {
+ admin1.functions().getFunctions(TENANT2, NAMESPACE);
+ fail("client admin shouldn't have permissions to list functions");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctions(TENANT, NAMESPACE);
+
+ // test triggerFunction
+ try {
+ admin1.functions().triggerFunction(TENANT2, NAMESPACE, functionName, sourceTopic, "foo", null);
+ fail("client admin shouldn't have permissions to trigger function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().triggerFunction(TENANT, NAMESPACE, functionName, sourceTopic, "foo", null);
+
+ // test restartFunctionInstance
+ try {
+ admin1.functions().restartFunction(TENANT2, NAMESPACE, functionName, 0);
+ fail("client admin shouldn't have permissions to restart function instance");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().restartFunction(TENANT, NAMESPACE, functionName, 0);
+
+ // test restartFunctionInstances
+ try {
+ admin1.functions().restartFunction(TENANT2, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to restart function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+ // test stopFunction instance
+ try {
+ admin1.functions().stopFunction(TENANT2, NAMESPACE, functionName, 0);
+ fail("client admin shouldn't have permissions to stop function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().stopFunction(TENANT, NAMESPACE, functionName, 0);
+
+ // test stopFunction all instance
+ try {
+ admin1.functions().stopFunction(TENANT2, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to restart function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().stopFunction(TENANT, NAMESPACE, functionName);
+
+ // test startFunction instance
+ try {
+ admin1.functions().startFunction(TENANT2, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to restart function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+ // test startFunction all instances
+ try {
+ admin1.functions().restartFunction(TENANT2, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to restart function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+ // delete functions
+ try {
+ admin1.functions().deleteFunction(TENANT2, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to delete function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+
+ admin1.functions().deleteFunction(TENANT, NAMESPACE, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin1.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin1.topics().getStats(sourceTopic).subscriptions.size(), 0);
+ }
+ }
+ }
+
+ @Test
public void testAuthorization() throws Exception {
String token1 = AuthTokenUtils.createToken(secretKey, SUBJECT, Optional.empty());
String token2 = AuthTokenUtils.createToken(secretKey, "wrong-subject", Optional.empty());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index 606cd86..b29b940 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -102,7 +102,8 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
id = createSecret(token, tenant, namespace, name);
}
} catch (Exception e) {
- throw new RuntimeException(e);
+ log.warn("Failed to get token for function {}", FunctionCommon.getFullyQualifiedName(tenant, namespace, name), e);
+ // ignore exception and continue since anonymous user might to used
}
if (id != null) {