You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/12/27 01:29:36 UTC

[pulsar] branch master updated: add backwards compatiblity to 2.2 for function admin API (#3241)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e63b658  add backwards compatiblity to 2.2 for function admin API (#3241)
e63b658 is described below

commit e63b658d8d0e5863d9b77658d3b1eb8b374f0de9
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Dec 26 17:29:31 2018 -0800

    add backwards compatiblity to 2.2 for function admin API (#3241)
    
    * add backwards compatiblity to 2.2 for function admin API
    
    * add license headers
    
    * renaming
    
    * fix tests
    
    * get integrations tests to pass
---
 .../org/apache/pulsar/broker/PulsarService.java    |   1 +
 .../apache/pulsar/broker/admin/v1/Functions.java   |   9 +-
 .../apache/pulsar/broker/admin/v2/Functions.java   | 303 ++++++++++-
 .../pulsar/broker/admin/{v2 => v3}/Functions.java  |   6 +-
 .../pulsar/broker/admin/{v2 => v3}/Sink.java       |   2 +-
 .../pulsar/broker/admin/{v2 => v3}/Source.java     |   2 +-
 .../client/admin/internal/FunctionsImpl.java       |   2 +-
 .../pulsar/client/admin/internal/SinkImpl.java     |   2 +-
 .../pulsar/client/admin/internal/SourceImpl.java   |   2 +-
 .../src/main/proto/InstanceCommunication.proto     |   8 +-
 .../pulsar/functions/worker/rest/Resources.java    |  19 +-
 .../pulsar/functions/worker/rest/WorkerServer.java |  10 +-
 .../functions/worker/rest/api/FunctionsImplV2.java | 210 +++++++
 .../worker/rest/api/v2/FunctionApiV2Resource.java  | 299 +++++-----
 .../FunctionApiV3Resource.java}                    |   6 +-
 .../SinkApiV3Resource.java}                        |   6 +-
 .../SourceApiV3Resource.java}                      |   6 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 603 ++++++++++-----------
 .../FunctionApiV3ResourceTest.java}                |   9 +-
 .../SinkApiV3ResourceTest.java}                    |   6 +-
 .../SourceApiV3ResourceTest.java}                  |   6 +-
 21 files changed, 1028 insertions(+), 489 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f576f95..417ec21 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -381,6 +381,7 @@ public class PulsarService implements AutoCloseable {
             this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap);
             this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap);
             this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
+            this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
             this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
 
             this.webService.addServlet("/metrics",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
index 7eca553..0591279 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
@@ -19,10 +19,17 @@
 package org.apache.pulsar.broker.admin.v1;
 
 import io.swagger.annotations.Api;
+
+import javax.ws.rs.Consumes;
 import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
 import org.apache.pulsar.broker.admin.impl.FunctionsBase;
 
 @Path("/functions")
 @Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
-public class Functions extends FunctionsBase {
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Functions extends  org.apache.pulsar.broker.admin.v2.Functions{
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index 854a365..19c3052 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -19,16 +19,311 @@
 package org.apache.pulsar.broker.admin.v2;
 
 import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
-
-import org.apache.pulsar.broker.admin.impl.FunctionsBase;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.function.Supplier;
 
 @Path("/functions")
 @Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class Functions extends FunctionsBase {
-}
+public class Functions extends AdminResource implements Supplier<WorkerService> {
+
+    private final FunctionsImplV2 functions;
+
+    public Functions() {
+        this.functions = new FunctionsImplV2(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @POST
+    @ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerFunction(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @FormDataParam("data") InputStream uploadedInputStream,
+                                     final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                     final @FormDataParam("url") String functionPkgUrl,
+                                     final @FormDataParam("functionDetails") String functionDetailsJson,
+                                     final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+    }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateFunction(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("functionName") String functionName,
+                                   final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String functionPkgUrl,
+                                   final @FormDataParam("functionDetails") String functionDetailsJson,
+                                   final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+    }
+
+
+    @DELETE
+    @ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "The function was successfully deleted")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}")
+    public Response deregisterFunction(final @PathParam("tenant") String tenant,
+                                       final @PathParam("namespace") String namespace,
+                                       final @PathParam("functionName") String functionName) {
+        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about a Pulsar Function currently running in cluster mode",
+            response = FunctionMetaData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}")
+    public Response getFunctionInfo(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName) throws IOException {
+
+        return functions.getFunctionInfo(
+                tenant, namespace, functionName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Function instance",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
+    public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
+                                              final @PathParam("namespace") String namespace,
+                                              final @PathParam("functionName") String functionName,
+                                              final @PathParam("instanceId") String instanceId) throws IOException {
+
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Function running in cluster mode",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/status")
+    public Response getFunctionStatus(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStatusV2(
+                tenant, namespace, functionName, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Lists all Pulsar Functions currently deployed in a given namespace",
+            response = String.class,
+            responseContainer = "Collection"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}")
+    public Response listFunctions(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace) {
+        return functions.listFunctions( tenant, namespace);
+    }
+
+    @POST
+    @ApiOperation(
+            value = "Triggers a Pulsar Function with a user-specified value or file data",
+            response = Message.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/trigger")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response triggerFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName,
+                                    final @FormDataParam("data") String triggerValue,
+                                    final @FormDataParam("dataStream") InputStream triggerStream,
+                                    final @FormDataParam("topic") String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetch the current state associated with a Pulsar Function",
+            response = String.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The key does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public Response getFunctionState(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @PathParam("key") String key) {
+        return functions.getFunctionState(tenant, namespace, functionName, key);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+                                    final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all function instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+        return functions.restartFunctionInstances(tenant, namespace, functionName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+                                 final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all function instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+        return functions.stopFunctionInstances(tenant, namespace, functionName);
+    }
+
+    @POST
+    @ApiOperation(
+            value = "Uploads Pulsar Function file data",
+            hidden = true
+    )
+    @Path("/upload")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("path") String path) {
+        return functions.uploadFunction(uploadedInputStream, path);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Downloads Pulsar Function file data",
+            hidden = true
+    )
+    @Path("/download")
+    public Response downloadFunction(final @QueryParam("path") String path) {
+        return functions.downloadFunction(path);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Path("/connectors")
+    public List<ConnectorDefinition> getConnectorsList() throws IOException {
+        return functions.getListOfConnectors();
+    }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java
similarity index 96%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java
index 854a365..ce634a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java
@@ -16,16 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.admin.v2;
+package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.FunctionsBase;
+
 import javax.ws.rs.Consumes;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 
-import org.apache.pulsar.broker.admin.impl.FunctionsBase;
-
 @Path("/functions")
 @Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
 @Produces(MediaType.APPLICATION_JSON)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
similarity index 96%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
index aea0ae7..e137f08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.admin.v2;
+package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
 import org.apache.pulsar.broker.admin.impl.SinkBase;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
similarity index 96%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
index e5ef56c..24e84eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.admin.v2;
+package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
 import org.apache.pulsar.broker.admin.impl.SourceBase;
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 1493d1d..130704e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -56,7 +56,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
 
     public FunctionsImpl(WebTarget web, Authentication auth) {
         super(auth);
-        this.functions = web.path("/admin/functions");
+        this.functions = web.path("/admin/v3/functions");
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 1363d35..a9f99b8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -46,7 +46,7 @@ public class SinkImpl extends BaseResource implements Sink {
 
     public SinkImpl(WebTarget web, Authentication auth) {
         super(auth);
-        this.sink = web.path("/admin/v2/sink");
+        this.sink = web.path("/admin/v3/sink");
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index d0d36a5..2d066e0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -46,7 +46,7 @@ public class SourceImpl extends BaseResource implements Source {
 
     public SourceImpl(WebTarget web, Authentication auth) {
         super(auth);
-        this.source = web.path("/admin/v2/source");
+        this.source = web.path("/admin/v3/source");
     }
 
     @Override
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 1b50ea8..aec1dd3 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -60,10 +60,10 @@ message FunctionStatus {
 }
 
 // Deprecated
-//message FunctionStatusList {
-//    string error = 2;
-//    repeated FunctionStatus functionStatusList = 1;
-//}
+message FunctionStatusList {
+    string error = 2;
+    repeated FunctionStatus functionStatusList = 1;
+}
 
 message MetricsData {
 //    message DataDigest {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index ac011db..530f528 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,8 +20,9 @@ package org.apache.pulsar.functions.worker.rest;
 
 import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.SinkApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.SourceApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.FunctionApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.SourceApiV3Resource;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
@@ -34,17 +35,25 @@ public final class Resources {
     private Resources() {
     }
 
-    public static Set<Class<?>> getApiResources() {
+    public static Set<Class<?>> getApiV2Resources() {
         return new HashSet<>(
                 Arrays.asList(
                         FunctionApiV2Resource.class,
-                        SourceApiV2Resource.class,
-                        SinkApiV2Resource.class,
                         WorkerApiV2Resource.class,
                         MultiPartFeature.class
                 ));
     }
 
+    public static Set<Class<?>> getApiV3Resources() {
+        return new HashSet<>(
+                Arrays.asList(
+                        MultiPartFeature.class,
+                        SourceApiV3Resource.class,
+                        SinkApiV3Resource.class,
+                        FunctionApiV3Resource.class
+                ));
+    }
+
     public static Set<Class<?>> getRootResources() {
         return new HashSet<>(
                 Arrays.asList(
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 5bc2f4c..35a4b7a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -76,7 +76,7 @@ public class WorkerServer {
     public WorkerServer(WorkerService workerService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
-        this.webServerExecutor = new WebExecutorThreadPool("function-web");
+        this.webServerExecutor = new WebExecutorThreadPool(8, "function-web");
         init();
     }
 
@@ -93,11 +93,13 @@ public class WorkerServer {
         connector.setPort(this.workerConfig.getWorkerPort());
         connectors.add(connector);
 
-        List<Handler> handlers = new ArrayList<>(3);
+        List<Handler> handlers = new ArrayList<>(4);
         handlers.add(
-                newServletContextHandler("/admin", new ResourceConfig(Resources.getApiResources()), workerService));
+                newServletContextHandler("/admin", new ResourceConfig(Resources.getApiV2Resources()), workerService));
         handlers.add(
-                newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiResources()), workerService));
+                newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiV2Resources()), workerService));
+        handlers.add(
+                newServletContextHandler("/admin/v3", new ResourceConfig(Resources.getApiV3Resources()), workerService));
         handlers.add(newServletContextHandler("/", new ResourceConfig(Resources.getRootResources()), workerService));
 
         RequestLogHandler requestLogHandler = new RequestLogHandler();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
new file mode 100644
index 0000000..c71e1c2
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class FunctionsImplV2 {
+
+    private FunctionsImpl delegate;
+    public FunctionsImplV2(Supplier<WorkerService> workerServiceSupplier) {
+        this.delegate = new FunctionsImpl(workerServiceSupplier);
+    }
+
+    // For test purposes
+    public FunctionsImplV2(FunctionsImpl delegate) {
+        this.delegate = delegate;
+    }
+
+    public Response getFunctionInfo(final String tenant, final String namespace, final String functionName)
+            throws IOException {
+
+        // run just for parameter checks
+        delegate.getFunctionInfo(tenant, namespace, functionName);
+
+        FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager();
+
+        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
+                functionName);
+        String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson(functionMetaData.getFunctionDetails());
+        return Response.status(Response.Status.OK).entity(functionDetailsJson).build();
+    }
+
+    public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
+                                              final String instanceId, URI uri) throws IOException {
+
+        org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
+                functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri);
+
+        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(toProto(functionInstanceStatus, instanceId));
+        return Response.status(Response.Status.OK).entity(jsonResponse).build();
+    }
+
+    public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri) throws
+            IOException {
+        FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri);
+        InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder();
+        functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
+                toProto(functionInstanceStatus.getStatus(),
+                        String.valueOf(functionInstanceStatus.getInstanceId()))));
+        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatusList);
+        return Response.status(Response.Status.OK).entity(jsonResponse).build();
+    }
+
+    public Response registerFunction(String tenant, String namespace, String functionName, InputStream
+            uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String
+                                             functionDetailsJson, String functionConfigJson, String clientAppId) {
+        delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+        return Response.ok().build();
+    }
+
+    public Response updateFunction(String tenant, String namespace, String functionName, InputStream uploadedInputStream,
+                                   FormDataContentDisposition fileDetail, String functionPkgUrl, String
+                                           functionDetailsJson, String functionConfigJson, String clientAppId) {
+        delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+        return Response.ok().build();
+    }
+
+    public Response deregisterFunction(String tenant, String namespace, String functionName, String clientAppId) {
+        delegate.deregisterFunction(tenant, namespace, functionName, clientAppId);
+        return Response.ok().build();
+    }
+
+    public Response listFunctions(String tenant, String namespace) {
+        Collection<String> functionStateList = delegate.listFunctions( tenant, namespace);
+        return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
+    }
+
+    public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue,
+                                    InputStream triggerStream, String topic) {
+        String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+        return Response.status(Response.Status.OK).entity(result).build();
+    }
+
+    public Response getFunctionState(String tenant, String namespace, String functionName, String key) {
+        FunctionState functionState = delegate.getFunctionState(
+                tenant, namespace, functionName, key);
+
+        String value;
+        if (functionState.getNumberValue() != null) {
+            value = "value : " + functionState.getNumberValue() + ", version : " + functionState.getVersion();
+        } else {
+            value = "value : " + functionState.getStringValue() + ", version : " + functionState.getVersion();
+        }
+        return Response.status(Response.Status.OK)
+                .entity(value)
+                .build();
+    }
+
+    public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
+            uri) {
+        delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+        return Response.ok().build();
+    }
+
+    public Response restartFunctionInstances(String tenant, String namespace, String functionName) {
+        delegate.restartFunctionInstances(tenant, namespace, functionName);
+        return Response.ok().build();
+    }
+
+    public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
+            uri) {
+        delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+        return Response.ok().build();
+    }
+
+    public Response stopFunctionInstances(String tenant, String namespace, String functionName) {
+        delegate.stopFunctionInstances(tenant, namespace, functionName);
+        return Response.ok().build();
+    }
+
+    public Response uploadFunction(InputStream uploadedInputStream, String path) {
+        delegate.uploadFunction(uploadedInputStream, path);
+        return Response.ok().build();
+    }
+
+    public Response downloadFunction(String path) {
+        return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path)).build();
+    }
+
+    public List<ConnectorDefinition> getListOfConnectors() {
+        return delegate.getListOfConnectors();
+    }
+
+    private InstanceCommunication.FunctionStatus toProto(
+            org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
+                    functionInstanceStatus, String instanceId) {
+        List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSysExceptions
+                = functionInstanceStatus.getLatestSystemExceptions()
+                .stream()
+                .map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                        .setExceptionString(exceptionInformation.getExceptionString())
+                        .setMsSinceEpoch(exceptionInformation.getTimestampMs())
+                        .build())
+                .collect(Collectors.toList());
+
+        List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions
+                = functionInstanceStatus.getLatestUserExceptions()
+                .stream()
+                .map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                        .setExceptionString(exceptionInformation.getExceptionString())
+                        .setMsSinceEpoch(exceptionInformation.getTimestampMs())
+                        .build())
+                .collect(Collectors.toList());
+
+
+        InstanceCommunication.FunctionStatus functionStatus = InstanceCommunication.FunctionStatus.newBuilder()
+                .setRunning(functionInstanceStatus.isRunning())
+                .setFailureException(functionInstanceStatus.getError())
+                .setNumRestarts(functionInstanceStatus.getNumRestarts())
+                .setNumSuccessfullyProcessed(functionInstanceStatus.getNumSuccessfullyProcessed())
+                .setNumUserExceptions(functionInstanceStatus.getNumUserExceptions())
+                .addAllLatestUserExceptions(latestUserExceptions)
+                .setNumSystemExceptions(functionInstanceStatus.getNumSystemExceptions())
+                .addAllLatestSystemExceptions(latestSysExceptions)
+                .setAverageLatency(functionInstanceStatus.getAverageLatency())
+                .setLastInvocationTime(functionInstanceStatus.getLastInvocationTime())
+                .setInstanceId(instanceId)
+                .setWorkerId(delegate.worker().getWorkerConfig().getWorkerId())
+                .build();
+
+        return functionStatus;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index b620fe5..2894c05 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -22,13 +22,12 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -39,10 +38,9 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -51,242 +49,271 @@ import java.util.List;
 @Path("/functions")
 public class FunctionApiV2Resource extends FunctionApiResource {
 
-    protected final FunctionsImpl functions;
+    protected final FunctionsImplV2 functions;
 
     public FunctionApiV2Resource() {
-        this.functions = new FunctionsImpl(this);
+        this.functions = new FunctionsImplV2(this);
     }
 
     @POST
+    @ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+    })
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void registerFunction(final @PathParam("tenant") String tenant,
-                                 final @PathParam("namespace") String namespace,
-                                 final @PathParam("functionName") String functionName,
-                                 final @FormDataParam("data") InputStream uploadedInputStream,
-                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                 final @FormDataParam("url") String functionPkgUrl,
-                                 final @FormDataParam("functionDetails") String functionDetailsJson,
-                                 final @FormDataParam("functionConfig") String functionConfigJson) {
+    public Response registerFunction(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @FormDataParam("data") InputStream uploadedInputStream,
+                                     final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                     final @FormDataParam("url") String functionPkgUrl,
+                                     final @FormDataParam("functionDetails") String functionDetailsJson,
+                                     final @FormDataParam("functionConfig") String functionConfigJson) {
 
-        functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+        return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
                 functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
-
     }
 
     @PUT
+    @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void updateFunction(final @PathParam("tenant") String tenant,
-                               final @PathParam("namespace") String namespace,
-                               final @PathParam("functionName") String functionName,
-                               final @FormDataParam("data") InputStream uploadedInputStream,
-                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                               final @FormDataParam("url") String functionPkgUrl,
-                               final @FormDataParam("functionDetails") String functionDetailsJson,
-                               final @FormDataParam("functionConfig") String functionConfigJson) {
+    public Response updateFunction(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("functionName") String functionName,
+                                   final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String functionPkgUrl,
+                                   final @FormDataParam("functionDetails") String functionDetailsJson,
+                                   final @FormDataParam("functionConfig") String functionConfigJson) {
 
-        functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+        return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
                 functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
-
     }
 
+
     @DELETE
+    @ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "The function was successfully deleted")
+    })
     @Path("/{tenant}/{namespace}/{functionName}")
-    public void deregisterFunction(final @PathParam("tenant") String tenant,
-                                   final @PathParam("namespace") String namespace,
-                                   final @PathParam("functionName") String functionName) {
-        functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+    public Response deregisterFunction(final @PathParam("tenant") String tenant,
+                                       final @PathParam("namespace") String namespace,
+                                       final @PathParam("functionName") String functionName) {
+        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
     }
 
     @GET
+    @ApiOperation(
+            value = "Fetches information about a Pulsar Function currently running in cluster mode",
+            response = Function.FunctionMetaData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
     @Path("/{tenant}/{namespace}/{functionName}")
-    public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
-                                          final @PathParam("namespace") String namespace,
-                                          final @PathParam("functionName") String functionName) {
-        return functions.getFunctionInfo(tenant, namespace, functionName);
+    public Response getFunctionInfo(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName) throws IOException {
+
+        return functions.getFunctionInfo(
+                tenant, namespace, functionName);
     }
 
     @GET
     @ApiOperation(
             value = "Displays the status of a Pulsar Function instance",
-            response = FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class
+            response = InstanceCommunication.FunctionStatus.class
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "The function doesn't exist")
     })
-    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
-            final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace,
-            final @PathParam("functionName") String functionName,
-            final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionInstanceStatus(
-                tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
+                                              final @PathParam("namespace") String namespace,
+                                              final @PathParam("functionName") String functionName,
+                                              final @PathParam("instanceId") String instanceId) throws IOException {
+
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @GET
     @ApiOperation(
-            value = "Displays the status of a Pulsar Function",
-            response = FunctionStatus.class
+            value = "Displays the status of a Pulsar Function running in cluster mode",
+            response = InstanceCommunication.FunctionStatus.class
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The function doesn't exist")
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
-    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/status")
-    public FunctionStatus getFunctionStatus(
-            final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace,
-            final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStatus(
+    public Response getFunctionStatus(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStatusV2(
                 tenant, namespace, functionName, uri.getRequestUri());
     }
 
     @GET
     @ApiOperation(
-            value = "Displays the stats of a Pulsar Function",
-            response = FunctionStats.class
+            value = "Lists all Pulsar Functions currently deployed in a given namespace",
+            response = String.class,
+            responseContainer = "Collection"
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The function doesn't exist")
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("/{tenant}/{namespace}/{functionName}/stats")
-    public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
-                                          final @PathParam("namespace") String namespace,
-                                          final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
+    @Path("/{tenant}/{namespace}")
+    public Response listFunctions(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace) {
+        return functions.listFunctions( tenant, namespace);
     }
 
-    @GET
+    @POST
     @ApiOperation(
-            value = "Displays the stats of a Pulsar Function instance",
-            response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
+            value = "Triggers a Pulsar Function with a user-specified value or file data",
+            response = Message.class
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The function doesn't exist")
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 500, message = "Internal server error")
     })
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
-    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
-            final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace,
-            final @PathParam("functionName") String functionName,
-            final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionsInstanceStats(
-                tenant, namespace, functionName, instanceId, uri.getRequestUri());
-    }
-
-    @POST
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public String triggerFunction(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String namespace,
-                                  final @PathParam("functionName") String functionName,
-                                  final @FormDataParam("data") String input,
-                                  final @FormDataParam("dataStream") InputStream uploadedInputStream,
-                                  final @FormDataParam("topic") String topic) {
-        return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
+    public Response triggerFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName,
+                                    final @FormDataParam("data") String triggerValue,
+                                    final @FormDataParam("dataStream") InputStream triggerStream,
+                                    final @FormDataParam("topic") String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
     }
 
-    @POST
-    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @GET
+    @ApiOperation(
+            value = "Fetch the current state associated with a Pulsar Function",
+            response = String.class
+    )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The key does not exist"),
             @ApiResponse(code = 500, message = "Internal server error")
     })
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public Response getFunctionState(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @PathParam("key") String key) {
+        return functions.getFunctionState(tenant, namespace, functionName, key);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void restartFunction(final @PathParam("tenant") String tenant,
-                                final @PathParam("namespace") String namespace,
-                                final @PathParam("functionName") String functionName,
-                                final @PathParam("instanceId") String instanceId) {
-        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+                                    final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Restart all function instances", response = Void.class)
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
-    })
+            @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{functionName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void restartFunction(final @PathParam("tenant") String tenant,
-                                final @PathParam("namespace") String namespace,
-                                final @PathParam("functionName") String functionName) {
-        functions.restartFunctionInstances(tenant, namespace, functionName);
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+        return functions.restartFunctionInstances(tenant, namespace, functionName);
     }
 
     @POST
     @ApiOperation(value = "Stop function instance", response = Void.class)
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
-    })
+            @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void stopFunction(final @PathParam("tenant") String tenant,
-                             final @PathParam("namespace") String namespace,
-                             final @PathParam("functionName") String functionName,
-                             final @PathParam("instanceId") String instanceId) {
-        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+                                 final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Stop all function instances", response = Void.class)
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
-    })
+            @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{functionName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void stopFunction(final @PathParam("tenant") String tenant,
-                             final @PathParam("namespace") String namespace,
-                             final @PathParam("functionName") String functionName) {
-        functions.stopFunctionInstances(tenant, namespace, functionName);
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+        return functions.stopFunctionInstances(tenant, namespace, functionName);
     }
 
     @POST
+    @ApiOperation(
+            value = "Uploads Pulsar Function file data",
+            hidden = true
+    )
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
-                               final @FormDataParam("path") String path) {
-        functions.uploadFunction(uploadedInputStream, path);
+    public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("path") String path) {
+        return functions.uploadFunction(uploadedInputStream, path);
     }
 
     @GET
+    @ApiOperation(
+            value = "Downloads Pulsar Function file data",
+            hidden = true
+    )
     @Path("/download")
-    public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
+    public Response downloadFunction(final @QueryParam("path") String path) {
         return functions.downloadFunction(path);
     }
 
     @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
     @Path("/connectors")
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
-
-    @GET
-    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
-    public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
-                                          final @PathParam("namespace") String namespace,
-                                          final @PathParam("functionName") String functionName,
-                                          final @PathParam("key") String key) throws IOException {
-        return functions.getFunctionState(tenant, namespace, functionName, key);
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
similarity index 98%
copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
copy to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index b620fe5..d25f8f9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -49,11 +49,11 @@ import java.util.List;
 
 @Slf4j
 @Path("/functions")
-public class FunctionApiV2Resource extends FunctionApiResource {
+public class FunctionApiV3Resource extends FunctionApiResource {
 
     protected final FunctionsImpl functions;
 
-    public FunctionApiV2Resource() {
+    public FunctionApiV3Resource() {
         this.functions = new FunctionsImpl(this);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
similarity index 98%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index e630db4..6a8f25e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -42,11 +42,11 @@ import java.util.List;
 
 @Slf4j
 @Path("/sink")
-public class SinkApiV2Resource extends FunctionApiResource {
+public class SinkApiV3Resource extends FunctionApiResource {
 
     protected final SinkImpl sink;
 
-    public SinkApiV2Resource() {
+    public SinkApiV3Resource() {
         this.sink = new SinkImpl(this);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
similarity index 98%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index bb49fe1..8675cc5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -41,11 +41,11 @@ import java.util.List;
 
 @Slf4j
 @Path("/source")
-public class SourceApiV2Resource extends FunctionApiResource {
+public class SourceApiV3Resource extends FunctionApiResource {
 
     protected final SourceImpl source;
 
-    public SourceApiV2Resource() {
+    public SourceApiV3Resource() {
         this.source = new SourceImpl(this);
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 25a3a66..aa22650 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -75,6 +76,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.pulsar.functions.utils.Utils.mergeJson;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -131,7 +133,7 @@ public class FunctionApiV2ResourceTest {
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
     private Namespace mockedNamespace;
-    private FunctionsImpl resource;
+    private FunctionsImplV2 resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetadata;
@@ -167,16 +169,20 @@ public class FunctionApiV2ResourceTest {
 
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
-            .setWorkerId("test")
-            .setWorkerPort(8080)
-            .setDownloadDirectory("/tmp/pulsar/functions")
-            .setFunctionMetadataTopicName("pulsar/functions")
-            .setNumFunctionPackageReplicas(3)
-            .setPulsarServiceUrl("pulsar://localhost:6650/");
+                .setWorkerId("test")
+                .setWorkerPort(8080)
+                .setDownloadDirectory("/tmp/pulsar/functions")
+                .setFunctionMetadataTopicName("pulsar/functions")
+                .setNumFunctionPackageReplicas(3)
+                .setPulsarServiceUrl("pulsar://localhost:6650/");
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
-        this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+        FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService));
+
+        doReturn(ComponentImpl.ComponentType.FUNCTION).when(functions).calculateSubjectType(any());
+
+        this.resource = spy(new FunctionsImplV2(functions));
+
     }
 
     //
@@ -208,16 +214,16 @@ public class FunctionApiV2ResourceTest {
     public void testRegisterFunctionMissingNamespace() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                null,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    null,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     null);
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -228,22 +234,22 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
     public void testRegisterFunctionMissingFunctionName() {
         try {
-        testRegisterFunctionMissingArguments(
-            tenant,
-            namespace,
-            null,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                null);
-    } catch (RestException re){
-        assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
-        throw re;
-    }
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    null,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
@@ -292,16 +298,16 @@ public class FunctionApiV2ResourceTest {
     public void testRegisterFunctionMissingPackageDetails() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                null,
-                outputTopic,
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    null,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     null);
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -355,17 +361,17 @@ public class FunctionApiV2ResourceTest {
     public void testRegisterFunctionWrongParallelism() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                className,
-                -2,
-                null);
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    -2,
+                    null);
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -398,17 +404,17 @@ public class FunctionApiV2ResourceTest {
     public void testRegisterFunctionWrongOutputTopic() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                function + "-output-topic/test:",
-                outputSerdeClassName,
-                className,
-                parallelism,
-                null);
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    function + "-output-topic/test:",
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -419,17 +425,17 @@ public class FunctionApiV2ResourceTest {
     public void testRegisterFunctionHttpUrl() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                null,
-                outputTopic,
-                outputSerdeClassName,
-                className,
-                parallelism,
-                "http://localhost:1234/test");
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    null,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    "http://localhost:1234/test");
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -491,14 +497,14 @@ public class FunctionApiV2ResourceTest {
     private void registerDefaultFunction() {
         FunctionConfig functionConfig = createDefaultFunctionConfig();
         resource.registerFunction(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            mockedFormData,
-            null,
-            null,
-            new Gson().toJson(functionConfig),
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                mockedFormData,
+                null,
+                null,
+                new Gson().toJson(functionConfig),
                 null);
     }
 
@@ -523,8 +529,8 @@ public class FunctionApiV2ResourceTest {
 
             Utils.uploadFileToBookkeeper(
                     anyString(),
-                any(File.class),
-                any(Namespace.class));
+                    any(File.class),
+                    any(Namespace.class));
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
             registerDefaultFunction();
@@ -540,15 +546,15 @@ public class FunctionApiV2ResourceTest {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
             RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("function registered");
+                    .setSuccess(true)
+                    .setMessage("function registered");
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -587,15 +593,15 @@ public class FunctionApiV2ResourceTest {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
             RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to register");
+                    .setSuccess(false)
+                    .setMessage("function failed to register");
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -612,14 +618,14 @@ public class FunctionApiV2ResourceTest {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
             CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registeration interrupted"));
+                    new IOException("Function registeration interrupted"));
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
             registerDefaultFunction();
@@ -637,16 +643,16 @@ public class FunctionApiV2ResourceTest {
     public void testUpdateFunctionMissingTenant() {
         try {
             testUpdateFunctionMissingArguments(
-                null,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    null,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     "Tenant is not provided");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -658,16 +664,16 @@ public class FunctionApiV2ResourceTest {
     public void testUpdateFunctionMissingNamespace() {
         try {
             testUpdateFunctionMissingArguments(
-                tenant,
-                null,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    null,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     "Namespace is not provided");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -679,16 +685,16 @@ public class FunctionApiV2ResourceTest {
     public void testUpdateFunctionMissingFunctionName() {
         try {
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                null,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    namespace,
+                    null,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     "Function Name is not provided");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -703,16 +709,16 @@ public class FunctionApiV2ResourceTest {
             doNothing().when(Utils.class);
             Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     "Update contains no change");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -751,16 +757,16 @@ public class FunctionApiV2ResourceTest {
             doNothing().when(Utils.class);
             Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                null,
-                parallelism,
+                    null,
+                    parallelism,
                     "Update contains no change");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -775,17 +781,17 @@ public class FunctionApiV2ResourceTest {
             doNothing().when(Utils.class);
             Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                null,
-                parallelism + 1,
-                null);
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    null,
+                    parallelism + 1,
+                    null);
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -799,17 +805,17 @@ public class FunctionApiV2ResourceTest {
             doNothing().when(Utils.class);
             Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                "DifferentOutput",
-                outputSerdeClassName,
-                null,
-                parallelism,
-                "Output topics differ");
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    "DifferentOutput",
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    "Output topics differ");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -825,17 +831,17 @@ public class FunctionApiV2ResourceTest {
             Map<String, String> someOtherInput = new HashMap<>();
             someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                someOtherInput,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                null,
-                parallelism,
-                "Input Topics cannot be altered");
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    someOtherInput,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    "Input Topics cannot be altered");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -892,14 +898,14 @@ public class FunctionApiV2ResourceTest {
         }
 
         resource.updateFunction(
-            tenant,
-            namespace,
-            function,
-            inputStream,
-            details,
-            null,
-            null,
-            new Gson().toJson(functionConfig),
+                tenant,
+                namespace,
+                function,
+                inputStream,
+                details,
+                null,
+                null,
+                new Gson().toJson(functionConfig),
                 null);
 
     }
@@ -917,14 +923,14 @@ public class FunctionApiV2ResourceTest {
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
 
         resource.updateFunction(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            mockedFormData,
-            null,
-            null,
-            new Gson().toJson(functionConfig),
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                mockedFormData,
+                null,
+                null,
+                new Gson().toJson(functionConfig),
                 null);
     }
 
@@ -946,8 +952,8 @@ public class FunctionApiV2ResourceTest {
             doThrow(new IOException("upload failure")).when(Utils.class);
             Utils.uploadFileToBookkeeper(
                     anyString(),
-                any(File.class),
-                any(Namespace.class));
+                    any(File.class),
+                    any(Namespace.class));
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
@@ -963,15 +969,15 @@ public class FunctionApiV2ResourceTest {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
         Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("function registered");
+                .setSuccess(true)
+                .setMessage("function registered");
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -1000,18 +1006,18 @@ public class FunctionApiV2ResourceTest {
         RequestResult rr = new RequestResult()
                 .setSuccess(true)
                 .setMessage("function registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
         resource.updateFunction(
-            tenant,
-            namespace,
-            function,
-            null,
-            null,
-            filePackageUrl,
-            null,
-            new Gson().toJson(functionConfig),
+                tenant,
+                namespace,
+                function,
+                null,
+                null,
+                filePackageUrl,
+                null,
+                new Gson().toJson(functionConfig),
                 null);
 
     }
@@ -1022,15 +1028,15 @@ public class FunctionApiV2ResourceTest {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
             RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to register");
+                    .setSuccess(false)
+                    .setMessage("function failed to register");
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -1047,14 +1053,14 @@ public class FunctionApiV2ResourceTest {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
             CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registeration interrupted"));
+                    new IOException("Function registeration interrupted"));
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
             updateDefaultFunction();
@@ -1073,9 +1079,9 @@ public class FunctionApiV2ResourceTest {
         try {
 
             testDeregisterFunctionMissingArguments(
-                null,
-                namespace,
-                function
+                    null,
+                    namespace,
+                    function
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1087,9 +1093,9 @@ public class FunctionApiV2ResourceTest {
     public void testDeregisterFunctionMissingNamespace() {
         try {
             testDeregisterFunctionMissingArguments(
-                tenant,
-                null,
-                function
+                    tenant,
+                    null,
+                    function
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1100,10 +1106,10 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
     public void testDeregisterFunctionMissingFunctionName() {
         try {
-             testDeregisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                null
+            testDeregisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    null
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1117,17 +1123,17 @@ public class FunctionApiV2ResourceTest {
             String function
     ) {
         resource.deregisterFunction(
-            tenant,
-            namespace,
-            function,
+                tenant,
+                namespace,
+                function,
                 null);
     }
 
     private void deregisterDefaultFunction() {
         resource.deregisterFunction(
-            tenant,
-            namespace,
-            function,
+                tenant,
+                namespace,
+                function,
                 null);
     }
 
@@ -1147,8 +1153,8 @@ public class FunctionApiV2ResourceTest {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("function deregistered");
+                .setSuccess(true)
+                .setMessage("function deregistered");
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
 
@@ -1161,8 +1167,8 @@ public class FunctionApiV2ResourceTest {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
             RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to deregister");
+                    .setSuccess(false)
+                    .setMessage("function failed to deregister");
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
 
@@ -1195,12 +1201,12 @@ public class FunctionApiV2ResourceTest {
     //
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
-    public void testGetFunctionMissingTenant() {
+    public void testGetFunctionMissingTenant() throws IOException {
         try {
             testGetFunctionMissingArguments(
-                null,
-                namespace,
-                function
+                    null,
+                    namespace,
+                    function
             );
         }
         catch (RestException re) {
@@ -1210,12 +1216,12 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
-    public void testGetFunctionMissingNamespace() {
+    public void testGetFunctionMissingNamespace() throws IOException {
         try {
             testGetFunctionMissingArguments(
-                tenant,
-                null,
-                function
+                    tenant,
+                    null,
+                    function
             );
         }
         catch (RestException re) {
@@ -1225,12 +1231,12 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
-    public void testGetFunctionMissingFunctionName() {
+    public void testGetFunctionMissingFunctionName() throws IOException {
         try {
             testGetFunctionMissingArguments(
-                tenant,
-                namespace,
-                null
+                    tenant,
+                    namespace,
+                    null
             );
         }
         catch (RestException re) {
@@ -1243,25 +1249,28 @@ public class FunctionApiV2ResourceTest {
             String tenant,
             String namespace,
             String function
-    ) {
+    ) throws IOException {
         resource.getFunctionInfo(
-            tenant,
-            namespace,
-            function
+                tenant,
+                namespace,
+                function
         );
 
     }
 
-    private FunctionConfig getDefaultFunctionInfo() {
-        return resource.getFunctionInfo(
-            tenant,
-            namespace,
-            function
-        );
+    private FunctionDetails getDefaultFunctionInfo() throws IOException {
+        String json = (String) resource.getFunctionInfo(
+                tenant,
+                namespace,
+                function
+        ).getEntity();
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        mergeJson(json, functionDetailsBuilder);
+        return functionDetailsBuilder.build();
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
-    public void testGetNotExistedFunction() {
+    public void testGetNotExistedFunction() throws IOException {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
             getDefaultFunctionInfo();
@@ -1272,7 +1281,7 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test
-    public void testGetFunctionSuccess() {
+    public void testGetFunctionSuccess() throws IOException {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         SinkSpec sinkSpec = SinkSpec.newBuilder()
@@ -1289,17 +1298,17 @@ public class FunctionApiV2ResourceTest {
                 .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
                         .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
-            .setCreateTime(System.currentTimeMillis())
-            .setFunctionDetails(functionDetails)
-            .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
-            .setVersion(1234)
-            .build();
+                .setCreateTime(System.currentTimeMillis())
+                .setFunctionDetails(functionDetails)
+                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+                .setVersion(1234)
+                .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
 
-        FunctionConfig functionConfig = getDefaultFunctionInfo();
+        FunctionDetails actual = getDefaultFunctionInfo();
         assertEquals(
-                FunctionConfigUtils.convertFromDetails(functionDetails),
-                functionConfig);
+                functionDetails,
+                actual);
     }
 
     //
@@ -1310,8 +1319,8 @@ public class FunctionApiV2ResourceTest {
     public void testListFunctionsMissingTenant() {
         try {
             testListFunctionsMissingArguments(
-                null,
-                namespace
+                    null,
+                    namespace
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1323,8 +1332,8 @@ public class FunctionApiV2ResourceTest {
     public void testListFunctionsMissingNamespace() {
         try {
             testListFunctionsMissingArguments(
-                tenant,
-                null
+                    tenant,
+                    null
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1337,17 +1346,17 @@ public class FunctionApiV2ResourceTest {
             String namespace
     ) {
         resource.listFunctions(
-            tenant,
-            namespace
+                tenant,
+                namespace
         );
 
     }
 
     private List<String> listDefaultFunctions() {
-        return resource.listFunctions(
-            tenant,
-            namespace
-        );
+        return new Gson().fromJson((String) resource.listFunctions(
+                tenant,
+                namespace
+        ).getEntity(), List.class);
     }
 
     @Test
@@ -1369,33 +1378,11 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test
-    public void testOnlyGetSources() {
-        List<String> functions = Lists.newArrayList("test-2");
-        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
-        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
-                FunctionDetails.newBuilder().setName("test-1").build()).build();
-        functionMetaDataList.add(f1);
-        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
-                FunctionDetails.newBuilder().setName("test-2").build()).build();
-        functionMetaDataList.add(f2);
-        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
-                FunctionDetails.newBuilder().setName("test-3").build()).build();
-        functionMetaDataList.add(f3);
-        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
-        doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
-
-        List<String> functionList = listDefaultFunctions();
-        assertEquals(functions, functionList);
-    }
-
-    @Test
     public void testDownloadFunctionHttpUrl() throws Exception {
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
         String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionsImpl function = new FunctionsImpl(null);
-        StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
+        FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
+        StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl).getEntity();
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
@@ -1409,8 +1396,8 @@ public class FunctionApiV2ResourceTest {
     public void testDownloadFunctionFile() throws Exception {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionsImpl function = new FunctionsImpl(null);
-        StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
+        FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
+        StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation).getEntity();
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
similarity index 99%
copy from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
copy to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 25a3a66..580345e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
@@ -52,6 +52,7 @@ import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -93,7 +94,7 @@ import static org.testng.Assert.assertEquals;
 @PrepareForTest(Utils.class)
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
 @Slf4j
-public class FunctionApiV2ResourceTest {
+public class FunctionApiV3ResourceTest {
 
     @ObjectFactory
     public IObjectFactory getObjectFactory() {
@@ -1393,7 +1394,7 @@ public class FunctionApiV2ResourceTest {
     @Test
     public void testDownloadFunctionHttpUrl() throws Exception {
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
-        String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionsImpl function = new FunctionsImpl(null);
         StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
@@ -1408,7 +1409,7 @@ public class FunctionApiV2ResourceTest {
     @Test
     public void testDownloadFunctionFile() throws Exception {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionsImpl function = new FunctionsImpl(null);
         StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
similarity index 99%
rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index ee9a014..bec3ca9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
@@ -87,12 +87,12 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
 /**
- * Unit test of {@link SinkApiV2Resource}.
+ * Unit test of {@link SinkApiV3Resource}.
  */
 @PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
-public class SinkApiV2ResourceTest {
+public class SinkApiV3ResourceTest {
 
     @ObjectFactory
     public IObjectFactory getObjectFactory() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
similarity index 99%
rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 194f624..72d23e2 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
@@ -86,12 +86,12 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
 /**
- * Unit test of {@link SourceApiV2Resource}.
+ * Unit test of {@link SourceApiV3Resource}.
  */
 @PrepareForTest({Utils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
-public class SourceApiV2ResourceTest {
+public class SourceApiV3ResourceTest {
 
     @ObjectFactory
     public IObjectFactory getObjectFactory() {