You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/01 17:54:14 UTC

[pulsar] branch master updated: Fix: Function auth should ignore exception because it might be anonymous user (#4185)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e5b691b  Fix: Function auth should ignore exception because it might be anonymous user (#4185)
e5b691b is described below

commit e5b691b77fa70f529e5bb0c8ae668478a6ca4d1b
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed May 1 10:54:08 2019 -0700

    Fix: Function auth should ignore exception because it might be anonymous user (#4185)
    
    * ignore exception because of anonymous user
    
    * fix function authorization bug involving a tenant with no admin
    
    * cleaning up comments
    
    * add tests
    
    * adding tests
---
 .../worker/PulsarFunctionE2ESecurityTest.java      | 259 +++++++++++++++++++++
 .../auth/KubernetesSecretsTokenAuthProvider.java   |   3 +-
 2 files changed, 261 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 88bb83e..775a4bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -100,6 +101,7 @@ public class PulsarFunctionE2ESecurityTest {
 
     private static final String SUBJECT = "my-test-subject";
     private static final String ADMIN_SUBJECT = "superUser";
+    private static final String ANONYMOUS_ROLE = "anonymousUser";
 
     private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
     private String adminToken;
@@ -137,6 +139,7 @@ public class PulsarFunctionE2ESecurityTest {
         config.setAuthenticationProviders(providers);
         config.setAuthorizationEnabled(true);
         config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        config.setAnonymousUserRole(ANONYMOUS_ROLE);
         secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
         Properties properties = new Properties();
         properties.setProperty("tokenSecretKey",
@@ -261,6 +264,262 @@ public class PulsarFunctionE2ESecurityTest {
     }
 
     @Test
+    public void testAuthorizationWithAnonymousUser() throws Exception {
+
+        final String replNamespace = TENANT + "/" + NAMESPACE;
+        final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarFunction-test";
+        final String subscriptionName = "test-sub";
+
+
+
+        try (PulsarAdmin admin1 = spy(
+                PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build())
+        ) {
+
+            String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+
+            FunctionConfig functionConfig = createFunctionConfig(TENANT, NAMESPACE, functionName,
+                    sourceTopic, sinkTopic, subscriptionName);
+
+            FunctionConfig functionConfig2 =  createFunctionConfig(TENANT2, NAMESPACE, functionName,
+                    sourceTopic, sinkTopic, subscriptionName);
+
+            // creating function should fail since admin1 doesn't have permissions granted yet
+            try {
+                admin1.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+                fail("client admin shouldn't have permissions to create function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+
+            // grant permissions to annoynmous role
+            Set<AuthAction> actions = new HashSet<>();
+            actions.add(AuthAction.functions);
+            actions.add(AuthAction.produce);
+            actions.add(AuthAction.consume);
+            superUserAdmin.namespaces().grantPermissionOnNamespace(replNamespace, ANONYMOUS_ROLE, actions);
+
+            // user should be able to create function now
+            admin1.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+            // should still fail on a different namespace
+            try {
+                admin1.functions().createFunctionWithUrl(functionConfig2, jarFilePathUrl);
+                fail("client admin shouldn't have permissions to create function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+
+            retryStrategically((test) -> {
+                try {
+                    return admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning() == 1
+                            && admin1.topics().getStats(sourceTopic).subscriptions.size() == 1;
+                } catch (PulsarAdminException e) {
+                    return false;
+                }
+            }, 5, 150);
+            // validate pulsar sink consumer has started on the topic
+            assertEquals(admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning(), 1);
+            assertEquals(admin1.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+            // create a producer that creates a topic at broker
+            try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+                 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe()) {
+
+                int totalMsgs = 5;
+                for (int i = 0; i < totalMsgs; i++) {
+                    String data = "my-message-" + i;
+                    producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+                }
+                retryStrategically((test) -> {
+                    try {
+                        SubscriptionStats subStats = admin1.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                        return subStats.unackedMessages == 0;
+                    } catch (PulsarAdminException e) {
+                        return false;
+                    }
+                }, 5, 150);
+
+                Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+                String receivedPropertyValue = msg.getProperty(propertyKey);
+                assertEquals(propertyValue, receivedPropertyValue);
+
+
+                // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked
+                // messages
+                // due to publish failure
+                assertNotEquals(admin1.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+                        totalMsgs);
+
+                // test update functions
+                functionConfig.setParallelism(2);
+                functionConfig2.setParallelism(2);
+
+                try {
+                    admin1.functions().updateFunctionWithUrl(functionConfig2, jarFilePathUrl);
+                    fail("client admin shouldn't have permissions to update function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+
+                admin1.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+                retryStrategically((test) -> {
+                    try {
+                        return admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning() == 2;
+                    } catch (PulsarAdminException e) {
+                        return false;
+                    }
+                }, 5, 150);
+
+                assertEquals(admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning(), 2);
+
+                // test getFunctionInfo
+                try {
+                    admin1.functions().getFunction(TENANT2, NAMESPACE, functionName);
+                    fail("client admin shouldn't have permissions to get function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().getFunction(TENANT, NAMESPACE, functionName);
+
+                // test getFunctionInstanceStatus
+                try {
+                    admin1.functions().getFunctionStatus(TENANT2, NAMESPACE, functionName, 0);
+                    fail("client admin shouldn't have permissions to get function status");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName, 0);
+
+                // test getFunctionStatus
+                try {
+                    admin1.functions().getFunctionStatus(TENANT2, NAMESPACE, functionName);
+                    fail("client admin shouldn't have permissions to get function status");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName);
+
+                // test getFunctionStats
+                try {
+                    admin1.functions().getFunctionStats(TENANT2, NAMESPACE, functionName);
+                    fail("client admin shouldn't have permissions to get function stats");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().getFunctionStats(TENANT, NAMESPACE, functionName);
+
+                // test getFunctionInstanceStats
+                try {
+                    admin1.functions().getFunctionStats(TENANT2, NAMESPACE, functionName, 0);
+                    fail("client admin shouldn't have permissions to get function stats");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().getFunctionStats(TENANT, NAMESPACE, functionName, 0);
+
+                // test listFunctions
+                try {
+                    admin1.functions().getFunctions(TENANT2, NAMESPACE);
+                    fail("client admin shouldn't have permissions to list functions");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().getFunctions(TENANT, NAMESPACE);
+
+                // test triggerFunction
+                try {
+                    admin1.functions().triggerFunction(TENANT2, NAMESPACE, functionName, sourceTopic, "foo", null);
+                    fail("client admin shouldn't have permissions to trigger function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().triggerFunction(TENANT, NAMESPACE, functionName, sourceTopic, "foo", null);
+
+                // test restartFunctionInstance
+                try {
+                    admin1.functions().restartFunction(TENANT2, NAMESPACE, functionName, 0);
+                    fail("client admin shouldn't have permissions to restart function instance");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().restartFunction(TENANT, NAMESPACE, functionName, 0);
+
+                // test restartFunctionInstances
+                try {
+                    admin1.functions().restartFunction(TENANT2, NAMESPACE, functionName);
+                    fail("client admin shouldn't have permissions to restart function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+                // test stopFunction instance
+                try {
+                    admin1.functions().stopFunction(TENANT2, NAMESPACE, functionName, 0);
+                    fail("client admin shouldn't have permissions to stop function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().stopFunction(TENANT, NAMESPACE, functionName, 0);
+
+                // test stopFunction all instance
+                try {
+                    admin1.functions().stopFunction(TENANT2, NAMESPACE, functionName);
+                    fail("client admin shouldn't have permissions to restart function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().stopFunction(TENANT, NAMESPACE, functionName);
+
+                // test startFunction instance
+                try {
+                    admin1.functions().startFunction(TENANT2, NAMESPACE, functionName);
+                    fail("client admin shouldn't have permissions to restart function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+                // test startFunction all instances
+                try {
+                    admin1.functions().restartFunction(TENANT2, NAMESPACE, functionName);
+                    fail("client admin shouldn't have permissions to restart function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+                admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+                // delete functions
+                try {
+                    admin1.functions().deleteFunction(TENANT2, NAMESPACE, functionName);
+                    fail("client admin shouldn't have permissions to delete function");
+                } catch (PulsarAdminException.NotAuthorizedException e) {
+
+                }
+
+                admin1.functions().deleteFunction(TENANT, NAMESPACE, functionName);
+
+                retryStrategically((test) -> {
+                    try {
+                        return admin1.topics().getStats(sourceTopic).subscriptions.size() == 0;
+                    } catch (PulsarAdminException e) {
+                        return false;
+                    }
+                }, 5, 150);
+
+                // make sure subscriptions are cleanup
+                assertEquals(admin1.topics().getStats(sourceTopic).subscriptions.size(), 0);
+            }
+        }
+    }
+
+    @Test
     public void testAuthorization() throws Exception {
         String token1 = AuthTokenUtils.createToken(secretKey, SUBJECT, Optional.empty());
         String token2 = AuthTokenUtils.createToken(secretKey, "wrong-subject", Optional.empty());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index 606cd86..b29b940 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -102,7 +102,8 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
                 id = createSecret(token, tenant, namespace, name);
             }
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            log.warn("Failed to get token for function {}", FunctionCommon.getFullyQualifiedName(tenant, namespace, name), e);
+            // ignore exception and continue since anonymous user might to used
         }
 
         if (id != null) {