You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:24:41 UTC

[pulsar] branch master updated: [pulsar-function] fix backward compatibility with 2.2 auth not working (#4241)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d75f1f  [pulsar-function] fix backward compatibility with 2.2 auth not working (#4241)
4d75f1f is described below

commit 4d75f1f129c1771eea60dfee5f4655979121e311
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat May 18 17:24:35 2019 -0700

    [pulsar-function] fix backward compatibility with 2.2 auth not working (#4241)
    
    * [pulsar-function] fix backward compatibility with 2.2 auth not working
    
    * Fix test
    
    * remove authentication from upload and download api
---
 .../apache/pulsar/broker/admin/v2/Functions.java   | 23 ++++++------
 .../functions/worker/rest/api/FunctionsImplV2.java | 42 +++++++++++-----------
 .../worker/rest/api/v2/FunctionApiV2Resource.java  | 27 +++++++-------
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 12 +++----
 .../rest/api/v3/FunctionApiV3ResourceTest.java     | 13 +++++--
 5 files changed, 65 insertions(+), 52 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index 25995d2..051bcc6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -142,7 +142,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
                                     final @PathParam("functionName") String functionName) throws IOException {
 
         return functions.getFunctionInfo(
-                tenant, namespace, functionName);
+                tenant, namespace, functionName, clientAppId());
     }
 
     @GET
@@ -161,7 +161,8 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
 
-        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(),
+                clientAppId());
     }
 
     @GET
@@ -178,7 +179,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStatusV2(
-                tenant, namespace, functionName, uri.getRequestUri());
+                tenant, namespace, functionName, uri.getRequestUri(), clientAppId());
     }
 
     @GET
@@ -194,7 +195,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
     @Path("/{tenant}/{namespace}")
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
-        return functions.listFunctions( tenant, namespace);
+        return functions.listFunctions( tenant, namespace, clientAppId());
     }
 
     @POST
@@ -216,7 +217,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
                                     final @FormDataParam("data") String triggerValue,
                                     final @FormDataParam("dataStream") InputStream triggerStream,
                                     final @FormDataParam("topic") String topic) {
-        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientAppId());
     }
 
     @GET
@@ -235,7 +236,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
                                      final @PathParam("namespace") String namespace,
                                      final @PathParam("functionName") String functionName,
                                      final @PathParam("key") String key) {
-        return functions.getFunctionState(tenant, namespace, functionName, key);
+        return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId());
     }
 
     @POST
@@ -248,7 +249,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
     public Response restartFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
                                     final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId());
     }
 
     @POST
@@ -260,7 +261,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.restartFunctionInstances(tenant, namespace, functionName);
+        return functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId());
     }
 
     @POST
@@ -273,7 +274,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
     public Response stopFunction(final @PathParam("tenant") String tenant,
                                  final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
                                  final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId());
     }
 
     @POST
@@ -285,7 +286,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopFunction(final @PathParam("tenant") String tenant,
                                  final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.stopFunctionInstances(tenant, namespace, functionName);
+        return functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId());
     }
 
     @POST
@@ -307,7 +308,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
     )
     @Path("/download")
     public Response downloadFunction(final @QueryParam("path") String path) {
-        return functions.downloadFunction(path);
+        return functions.downloadFunction(path, clientAppId());
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index d8b8a4f..b1da329 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -57,11 +57,11 @@ public class FunctionsImplV2 {
         this.delegate = delegate;
     }
 
-    public Response getFunctionInfo(final String tenant, final String namespace, final String functionName)
+    public Response getFunctionInfo(final String tenant, final String namespace, final String functionName, String clientRole)
             throws IOException {
 
         // run just for parameter checks
-        delegate.getFunctionInfo(tenant, namespace, functionName, null, null);
+        delegate.getFunctionInfo(tenant, namespace, functionName, clientRole, null);
 
         FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager();
 
@@ -72,18 +72,18 @@ public class FunctionsImplV2 {
     }
 
     public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
-                                              final String instanceId, URI uri) throws IOException {
+                                              final String instanceId, URI uri, String clientRole) throws IOException {
 
         org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
-                functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, null, null);
+                functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, clientRole, null);
 
         String jsonResponse = FunctionCommon.printJson(toProto(functionInstanceStatus, instanceId));
         return Response.status(Response.Status.OK).entity(jsonResponse).build();
     }
 
-    public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri) throws
+    public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri, String clientRole) throws
             IOException {
-        FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri, null, null);
+        FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri, clientRole, null);
         InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder();
         functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
                 toProto(functionInstanceStatus.getStatus(),
@@ -142,20 +142,20 @@ public class FunctionsImplV2 {
         return Response.ok().build();
     }
 
-    public Response listFunctions(String tenant, String namespace) {
-        Collection<String> functionStateList = delegate.listFunctions( tenant, namespace, null, null);
+    public Response listFunctions(String tenant, String namespace, String clientRole) {
+        Collection<String> functionStateList = delegate.listFunctions( tenant, namespace, clientRole, null);
         return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
     }
 
     public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue,
-                                    InputStream triggerStream, String topic) {
-        String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, null, null);
+                                    InputStream triggerStream, String topic, String clientRole) {
+        String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientRole, null);
         return Response.status(Response.Status.OK).entity(result).build();
     }
 
-    public Response getFunctionState(String tenant, String namespace, String functionName, String key) {
+    public Response getFunctionState(String tenant, String namespace, String functionName, String key, String clientRole) {
         FunctionState functionState = delegate.getFunctionState(
-                tenant, namespace, functionName, key, null, null);
+                tenant, namespace, functionName, key, clientRole, null);
 
         String value;
         if (functionState.getNumberValue() != null) {
@@ -169,24 +169,24 @@ public class FunctionsImplV2 {
     }
 
     public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
-            uri) {
-        delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri, null, null);
+            uri, String clientRole) {
+        delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole, null);
         return Response.ok().build();
     }
 
-    public Response restartFunctionInstances(String tenant, String namespace, String functionName) {
-        delegate.restartFunctionInstances(tenant, namespace, functionName, null, null);
+    public Response restartFunctionInstances(String tenant, String namespace, String functionName, String clientRole) {
+        delegate.restartFunctionInstances(tenant, namespace, functionName, clientRole, null);
         return Response.ok().build();
     }
 
     public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
-            uri) {
-        delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, null ,null);
+            uri, String clientRole) {
+        delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole ,null);
         return Response.ok().build();
     }
 
-    public Response stopFunctionInstances(String tenant, String namespace, String functionName) {
-        delegate.stopFunctionInstances(tenant, namespace, functionName, null, null);
+    public Response stopFunctionInstances(String tenant, String namespace, String functionName, String clientRole) {
+        delegate.stopFunctionInstances(tenant, namespace, functionName, clientRole, null);
         return Response.ok().build();
     }
 
@@ -195,7 +195,7 @@ public class FunctionsImplV2 {
         return Response.ok().build();
     }
 
-    public Response downloadFunction(String path) {
+    public Response downloadFunction(String path, String clientRole) {
         return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path)).build();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 23d3b4f..c978cad 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -132,7 +132,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                     final @PathParam("functionName") String functionName) throws IOException {
 
         return functions.getFunctionInfo(
-                tenant, namespace, functionName);
+                tenant, namespace, functionName, clientAppId());
     }
 
     @GET
@@ -151,7 +151,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
 
-        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(),
+                clientAppId());
     }
 
     @GET
@@ -167,8 +168,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response getFunctionStatus(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStatusV2(
-                tenant, namespace, functionName, uri.getRequestUri());
+        return functions.getFunctionStatusV2(tenant, namespace, functionName, uri.getRequestUri(), clientAppId());
     }
 
     @GET
@@ -184,7 +184,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     @Path("/{tenant}/{namespace}")
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
-        return functions.listFunctions( tenant, namespace);
+        return functions.listFunctions(tenant, namespace, clientAppId());
     }
 
     @POST
@@ -206,7 +206,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                     final @FormDataParam("data") String triggerValue,
                                     final @FormDataParam("dataStream") InputStream triggerStream,
                                     final @FormDataParam("topic") String topic) {
-        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic,
+                clientAppId());
     }
 
     @GET
@@ -225,7 +226,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                      final @PathParam("namespace") String namespace,
                                      final @PathParam("functionName") String functionName,
                                      final @PathParam("key") String key) {
-        return functions.getFunctionState(tenant, namespace, functionName, key);
+        return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId());
     }
 
     @POST
@@ -238,7 +239,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response restartFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
                                     final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(),
+                clientAppId());
     }
 
     @POST
@@ -250,7 +252,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.restartFunctionInstances(tenant, namespace, functionName);
+        return functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId());
     }
 
     @POST
@@ -263,7 +265,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response stopFunction(final @PathParam("tenant") String tenant,
                                  final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
                                  final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(),
+                clientAppId());
     }
 
     @POST
@@ -275,7 +278,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopFunction(final @PathParam("tenant") String tenant,
                                  final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.stopFunctionInstances(tenant, namespace, functionName);
+        return functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId());
     }
 
     @POST
@@ -297,7 +300,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     )
     @Path("/download")
     public Response downloadFunction(final @QueryParam("path") String path) {
-        return functions.downloadFunction(path);
+        return functions.downloadFunction(path, clientAppId());
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 053b19d..b05b0cd 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -1305,7 +1305,7 @@ public class FunctionApiV2ResourceTest {
         resource.getFunctionInfo(
                 tenant,
                 namespace,
-                function
+                function, null
         );
 
     }
@@ -1314,7 +1314,7 @@ public class FunctionApiV2ResourceTest {
         String json = (String) resource.getFunctionInfo(
                 tenant,
                 namespace,
-                function
+                function, null
         ).getEntity();
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
         mergeJson(json, functionDetailsBuilder);
@@ -1399,7 +1399,7 @@ public class FunctionApiV2ResourceTest {
     ) {
         resource.listFunctions(
                 tenant,
-                namespace
+                namespace, null
         );
 
     }
@@ -1407,7 +1407,7 @@ public class FunctionApiV2ResourceTest {
     private List<String> listDefaultFunctions() {
         return new Gson().fromJson((String) resource.listFunctions(
                 tenant,
-                namespace
+                namespace, null
         ).getEntity(), List.class);
     }
 
@@ -1434,7 +1434,7 @@ public class FunctionApiV2ResourceTest {
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
         String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
-        StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl).getEntity();
+        StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl, null).getEntity();
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
@@ -1449,7 +1449,7 @@ public class FunctionApiV2ResourceTest {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
-        StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation).getEntity();
+        StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation, null).getEntity();
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 1eff4e8..c364906 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -76,6 +76,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -1413,7 +1414,11 @@ public class FunctionApiV3ResourceTest {
     public void testDownloadFunctionHttpUrl() throws Exception {
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionsImpl function = new FunctionsImpl(null);
+        WorkerService worker = mock(WorkerService.class);
+        WorkerConfig config = mock(WorkerConfig.class);
+        when(config.isAuthorizationEnabled()).thenReturn(false);
+        when(worker.getWorkerConfig()).thenReturn(config);
+        FunctionsImpl function = new FunctionsImpl(()-> worker);
         StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
@@ -1428,7 +1433,11 @@ public class FunctionApiV3ResourceTest {
     public void testDownloadFunctionFile() throws Exception {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionsImpl function = new FunctionsImpl(null);
+        WorkerService worker = mock(WorkerService.class);
+        WorkerConfig config = mock(WorkerConfig.class);
+        when(config.isAuthorizationEnabled()).thenReturn(false);
+        when(worker.getWorkerConfig()).thenReturn(config);
+        FunctionsImpl function = new FunctionsImpl(() -> worker);
         StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);