You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/04/08 06:06:43 UTC

[pulsar] branch master updated: fix: bug when function auth is not enabled (#3980)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 23926ad  fix: bug when function auth is not enabled (#3980)
23926ad is described below

commit 23926ad5ee60db744a8dcd21dff4ab991acfac2b
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Apr 7 23:06:39 2019 -0700

    fix: bug when function auth is not enabled (#3980)
    
    * fix bug when function auth is not enabled
    
    * add comment and tests
    
    * fix bug
    
    * improving tests
---
 .../pulsar/functions/worker/FunctionActioner.java  | 29 +++++++----
 .../functions/worker/FunctionActionerTest.java     | 59 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 9 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 1488cf2..175d6e5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -154,8 +154,14 @@ public class FunctionActioner {
 
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
 
+        // check to make sure functionAuthenticationSpec has any data. If not set to null, since for protobuf,
+        // even if the field is not set its not going to be null. Have to use the "has" method to check
+        Function.FunctionAuthenticationSpec functionAuthenticationSpec
+                = instance.getFunctionMetaData().hasFunctionAuthSpec()
+                ? instance.getFunctionMetaData().getFunctionAuthSpec() : null;
+
         InstanceConfig instanceConfig = createInstanceConfig(functionDetailsBuilder.build(),
-                instance.getFunctionMetaData().getFunctionAuthSpec(),
+                functionAuthenticationSpec,
                 instanceId, workerConfig.getPulsarFunctionsCluster());
 
         RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
@@ -275,15 +281,20 @@ public class FunctionActioner {
 
         if (functionRuntimeInfo.getRuntimeSpawner() != null) {
             functionRuntimeInfo.getRuntimeSpawner().close();
+
             // cleanup any auth data cached
-            try {
-                functionRuntimeInfo.getRuntimeSpawner()
-                        .getRuntimeFactory().getAuthProvider()
-                        .cleanUpAuthData(
-                                details.getTenant(), details.getNamespace(), details.getName(),
-                                getFunctionAuthData(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionAuthSpec()));
-            } catch (Exception e) {
-                log.error("Failed to cleanup auth data for function: {}", fqfn, e);
+            if (functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec() != null) {
+                try {
+                    log.info("{}-{} Cleaning up authentication data for function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
+                    functionRuntimeInfo.getRuntimeSpawner()
+                            .getRuntimeFactory().getAuthProvider()
+                            .cleanUpAuthData(
+                                    details.getTenant(), details.getNamespace(), details.getName(),
+                                    getFunctionAuthData(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionAuthSpec()));
+
+                } catch (Exception e) {
+                    log.error("Failed to cleanup auth data for function: {}", fqfn, e);
+                }
             }
             functionRuntimeInfo.setRuntimeSpawner(null);
         }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index cd636d4..c93569b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -19,19 +19,23 @@
 package org.apache.pulsar.functions.worker;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.testng.annotations.Test;
 import static org.apache.pulsar.common.functions.Utils.FILE;
@@ -147,4 +151,59 @@ public class FunctionActionerTest {
         }
     }
 
+    @Test
+    public void testFunctionAuthDisabled() throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+        String downloadDir = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+        workerConfig.setDownloadDirectory(downloadDir);
+
+        RuntimeFactory factory = mock(RuntimeFactory.class);
+        Runtime runtime = mock(Runtime.class);
+        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+        doNothing().when(runtime).start();
+        Namespace dlogNamespace = mock(Namespace.class);
+        final String exceptionMsg = "dl namespace not-found";
+        doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
+
+        @SuppressWarnings("resource")
+        FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
+                new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+
+
+        String pkgPathLocation = "http://invalid/my-file.jar";
+        Function.FunctionMetaData functionMeta = Function.FunctionMetaData.newBuilder()
+                .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
+                        .setNamespace("test-namespace").setName("func-1"))
+                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
+                .build();
+
+        Function.Instance instance = Function.Instance.newBuilder()
+                .setFunctionMetaData(functionMeta).build();
+
+        RuntimeSpawner runtimeSpawner = spy(actioner.getRuntimeSpawner(instance, "foo"));
+
+        assertTrue(runtimeSpawner.getInstanceConfig().getFunctionAuthenticationSpec() == null);
+
+        FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+
+        RuntimeFactory runtimeFactory = mock(RuntimeFactory.class);
+
+        FunctionAuthProvider functionAuthProvider = mock(FunctionAuthProvider.class);
+        doReturn(functionAuthProvider).when(runtimeFactory).getAuthProvider();
+
+        doReturn(runtimeFactory).when(runtimeSpawner).getRuntimeFactory();
+        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+        doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
+
+        actioner.terminateFunction(functionRuntimeInfo);
+
+        // make sure cache
+        verify(functionAuthProvider, times(0)).cleanUpAuthData(any(), any(), any(), any());
+    }
+
 }