You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/27 01:29:33 UTC

[GitHub] jerrypeng closed pull request #3241: add backwards compatiblity to 2.2 for function admin API

jerrypeng closed pull request #3241: add backwards compatiblity to 2.2 for function admin API
URL: https://github.com/apache/pulsar/pull/3241
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 74e91bee9f..8a55608834 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -378,6 +378,7 @@ public Boolean get() {
             this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap);
             this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap);
             this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
+            this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
             this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
 
             this.webService.addServlet("/metrics",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
index 7eca553b24..0591279cee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
@@ -19,10 +19,17 @@
 package org.apache.pulsar.broker.admin.v1;
 
 import io.swagger.annotations.Api;
+
+import javax.ws.rs.Consumes;
 import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
 import org.apache.pulsar.broker.admin.impl.FunctionsBase;
 
 @Path("/functions")
 @Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
-public class Functions extends FunctionsBase {
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Functions extends  org.apache.pulsar.broker.admin.v2.Functions{
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index 854a365aaa..19c305232b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -19,16 +19,311 @@
 package org.apache.pulsar.broker.admin.v2;
 
 import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
-
-import org.apache.pulsar.broker.admin.impl.FunctionsBase;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.function.Supplier;
 
 @Path("/functions")
 @Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class Functions extends FunctionsBase {
-}
+public class Functions extends AdminResource implements Supplier<WorkerService> {
+
+    private final FunctionsImplV2 functions;
+
+    public Functions() {
+        this.functions = new FunctionsImplV2(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @POST
+    @ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerFunction(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @FormDataParam("data") InputStream uploadedInputStream,
+                                     final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                     final @FormDataParam("url") String functionPkgUrl,
+                                     final @FormDataParam("functionDetails") String functionDetailsJson,
+                                     final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+    }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateFunction(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("functionName") String functionName,
+                                   final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String functionPkgUrl,
+                                   final @FormDataParam("functionDetails") String functionDetailsJson,
+                                   final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+    }
+
+
+    @DELETE
+    @ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "The function was successfully deleted")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}")
+    public Response deregisterFunction(final @PathParam("tenant") String tenant,
+                                       final @PathParam("namespace") String namespace,
+                                       final @PathParam("functionName") String functionName) {
+        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about a Pulsar Function currently running in cluster mode",
+            response = FunctionMetaData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}")
+    public Response getFunctionInfo(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName) throws IOException {
+
+        return functions.getFunctionInfo(
+                tenant, namespace, functionName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Function instance",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
+    public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
+                                              final @PathParam("namespace") String namespace,
+                                              final @PathParam("functionName") String functionName,
+                                              final @PathParam("instanceId") String instanceId) throws IOException {
+
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Function running in cluster mode",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/status")
+    public Response getFunctionStatus(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStatusV2(
+                tenant, namespace, functionName, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Lists all Pulsar Functions currently deployed in a given namespace",
+            response = String.class,
+            responseContainer = "Collection"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}")
+    public Response listFunctions(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace) {
+        return functions.listFunctions( tenant, namespace);
+    }
+
+    @POST
+    @ApiOperation(
+            value = "Triggers a Pulsar Function with a user-specified value or file data",
+            response = Message.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/trigger")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response triggerFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName,
+                                    final @FormDataParam("data") String triggerValue,
+                                    final @FormDataParam("dataStream") InputStream triggerStream,
+                                    final @FormDataParam("topic") String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetch the current state associated with a Pulsar Function",
+            response = String.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The key does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public Response getFunctionState(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @PathParam("key") String key) {
+        return functions.getFunctionState(tenant, namespace, functionName, key);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+                                    final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all function instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+        return functions.restartFunctionInstances(tenant, namespace, functionName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+                                 final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all function instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+        return functions.stopFunctionInstances(tenant, namespace, functionName);
+    }
+
+    @POST
+    @ApiOperation(
+            value = "Uploads Pulsar Function file data",
+            hidden = true
+    )
+    @Path("/upload")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("path") String path) {
+        return functions.uploadFunction(uploadedInputStream, path);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Downloads Pulsar Function file data",
+            hidden = true
+    )
+    @Path("/download")
+    public Response downloadFunction(final @QueryParam("path") String path) {
+        return functions.downloadFunction(path);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Path("/connectors")
+    public List<ConnectorDefinition> getConnectorsList() throws IOException {
+        return functions.getListOfConnectors();
+    }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java
new file mode 100644
index 0000000000..ce634a670e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v3;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.FunctionsBase;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/functions")
+@Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Functions extends FunctionsBase {
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
similarity index 96%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
index aea0ae72f0..e137f08d9f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.admin.v2;
+package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
 import org.apache.pulsar.broker.admin.impl.SinkBase;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
similarity index 96%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
index e5ef56c0f4..24e84eb169 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.admin.v2;
+package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
 import org.apache.pulsar.broker.admin.impl.SourceBase;
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 1493d1df80..130704e590 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -56,7 +56,7 @@
 
     public FunctionsImpl(WebTarget web, Authentication auth) {
         super(auth);
-        this.functions = web.path("/admin/functions");
+        this.functions = web.path("/admin/v3/functions");
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 1363d3526e..a9f99b8bf3 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -46,7 +46,7 @@
 
     public SinkImpl(WebTarget web, Authentication auth) {
         super(auth);
-        this.sink = web.path("/admin/v2/sink");
+        this.sink = web.path("/admin/v3/sink");
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index d0d36a5e19..2d066e08de 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -46,7 +46,7 @@
 
     public SourceImpl(WebTarget web, Authentication auth) {
         super(auth);
-        this.source = web.path("/admin/v2/source");
+        this.source = web.path("/admin/v3/source");
     }
 
     @Override
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 1b50ea8b6e..aec1dd3a6a 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -60,10 +60,10 @@ message FunctionStatus {
 }
 
 // Deprecated
-//message FunctionStatusList {
-//    string error = 2;
-//    repeated FunctionStatus functionStatusList = 1;
-//}
+message FunctionStatusList {
+    string error = 2;
+    repeated FunctionStatus functionStatusList = 1;
+}
 
 message MetricsData {
 //    message DataDigest {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index ac011db2f7..530f528996 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,8 +20,9 @@
 
 import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.SinkApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.SourceApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.FunctionApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.SourceApiV3Resource;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
@@ -34,17 +35,25 @@
     private Resources() {
     }
 
-    public static Set<Class<?>> getApiResources() {
+    public static Set<Class<?>> getApiV2Resources() {
         return new HashSet<>(
                 Arrays.asList(
                         FunctionApiV2Resource.class,
-                        SourceApiV2Resource.class,
-                        SinkApiV2Resource.class,
                         WorkerApiV2Resource.class,
                         MultiPartFeature.class
                 ));
     }
 
+    public static Set<Class<?>> getApiV3Resources() {
+        return new HashSet<>(
+                Arrays.asList(
+                        MultiPartFeature.class,
+                        SourceApiV3Resource.class,
+                        SinkApiV3Resource.class,
+                        FunctionApiV3Resource.class
+                ));
+    }
+
     public static Set<Class<?>> getRootResources() {
         return new HashSet<>(
                 Arrays.asList(
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 5bc2f4cd4d..35a4b7a7c7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -76,7 +76,7 @@ private static String getErrorMessage(Server server, int port, Exception ex) {
     public WorkerServer(WorkerService workerService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
-        this.webServerExecutor = new WebExecutorThreadPool("function-web");
+        this.webServerExecutor = new WebExecutorThreadPool(8, "function-web");
         init();
     }
 
@@ -93,11 +93,13 @@ private void init() {
         connector.setPort(this.workerConfig.getWorkerPort());
         connectors.add(connector);
 
-        List<Handler> handlers = new ArrayList<>(3);
+        List<Handler> handlers = new ArrayList<>(4);
         handlers.add(
-                newServletContextHandler("/admin", new ResourceConfig(Resources.getApiResources()), workerService));
+                newServletContextHandler("/admin", new ResourceConfig(Resources.getApiV2Resources()), workerService));
         handlers.add(
-                newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiResources()), workerService));
+                newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiV2Resources()), workerService));
+        handlers.add(
+                newServletContextHandler("/admin/v3", new ResourceConfig(Resources.getApiV3Resources()), workerService));
         handlers.add(newServletContextHandler("/", new ResourceConfig(Resources.getRootResources()), workerService));
 
         RequestLogHandler requestLogHandler = new RequestLogHandler();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
new file mode 100644
index 0000000000..c71e1c265b
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class FunctionsImplV2 {
+
+    private FunctionsImpl delegate;
+    public FunctionsImplV2(Supplier<WorkerService> workerServiceSupplier) {
+        this.delegate = new FunctionsImpl(workerServiceSupplier);
+    }
+
+    // For test purposes
+    public FunctionsImplV2(FunctionsImpl delegate) {
+        this.delegate = delegate;
+    }
+
+    public Response getFunctionInfo(final String tenant, final String namespace, final String functionName)
+            throws IOException {
+
+        // run just for parameter checks
+        delegate.getFunctionInfo(tenant, namespace, functionName);
+
+        FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager();
+
+        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
+                functionName);
+        String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson(functionMetaData.getFunctionDetails());
+        return Response.status(Response.Status.OK).entity(functionDetailsJson).build();
+    }
+
+    public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
+                                              final String instanceId, URI uri) throws IOException {
+
+        org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
+                functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri);
+
+        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(toProto(functionInstanceStatus, instanceId));
+        return Response.status(Response.Status.OK).entity(jsonResponse).build();
+    }
+
+    public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri) throws
+            IOException {
+        FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri);
+        InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder();
+        functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
+                toProto(functionInstanceStatus.getStatus(),
+                        String.valueOf(functionInstanceStatus.getInstanceId()))));
+        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatusList);
+        return Response.status(Response.Status.OK).entity(jsonResponse).build();
+    }
+
+    public Response registerFunction(String tenant, String namespace, String functionName, InputStream
+            uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String
+                                             functionDetailsJson, String functionConfigJson, String clientAppId) {
+        delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+        return Response.ok().build();
+    }
+
+    public Response updateFunction(String tenant, String namespace, String functionName, InputStream uploadedInputStream,
+                                   FormDataContentDisposition fileDetail, String functionPkgUrl, String
+                                           functionDetailsJson, String functionConfigJson, String clientAppId) {
+        delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+        return Response.ok().build();
+    }
+
+    public Response deregisterFunction(String tenant, String namespace, String functionName, String clientAppId) {
+        delegate.deregisterFunction(tenant, namespace, functionName, clientAppId);
+        return Response.ok().build();
+    }
+
+    public Response listFunctions(String tenant, String namespace) {
+        Collection<String> functionStateList = delegate.listFunctions( tenant, namespace);
+        return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
+    }
+
+    public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue,
+                                    InputStream triggerStream, String topic) {
+        String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+        return Response.status(Response.Status.OK).entity(result).build();
+    }
+
+    public Response getFunctionState(String tenant, String namespace, String functionName, String key) {
+        FunctionState functionState = delegate.getFunctionState(
+                tenant, namespace, functionName, key);
+
+        String value;
+        if (functionState.getNumberValue() != null) {
+            value = "value : " + functionState.getNumberValue() + ", version : " + functionState.getVersion();
+        } else {
+            value = "value : " + functionState.getStringValue() + ", version : " + functionState.getVersion();
+        }
+        return Response.status(Response.Status.OK)
+                .entity(value)
+                .build();
+    }
+
+    public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
+            uri) {
+        delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+        return Response.ok().build();
+    }
+
+    public Response restartFunctionInstances(String tenant, String namespace, String functionName) {
+        delegate.restartFunctionInstances(tenant, namespace, functionName);
+        return Response.ok().build();
+    }
+
+    public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
+            uri) {
+        delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+        return Response.ok().build();
+    }
+
+    public Response stopFunctionInstances(String tenant, String namespace, String functionName) {
+        delegate.stopFunctionInstances(tenant, namespace, functionName);
+        return Response.ok().build();
+    }
+
+    public Response uploadFunction(InputStream uploadedInputStream, String path) {
+        delegate.uploadFunction(uploadedInputStream, path);
+        return Response.ok().build();
+    }
+
+    public Response downloadFunction(String path) {
+        return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path)).build();
+    }
+
+    public List<ConnectorDefinition> getListOfConnectors() {
+        return delegate.getListOfConnectors();
+    }
+
+    private InstanceCommunication.FunctionStatus toProto(
+            org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
+                    functionInstanceStatus, String instanceId) {
+        List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSysExceptions
+                = functionInstanceStatus.getLatestSystemExceptions()
+                .stream()
+                .map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                        .setExceptionString(exceptionInformation.getExceptionString())
+                        .setMsSinceEpoch(exceptionInformation.getTimestampMs())
+                        .build())
+                .collect(Collectors.toList());
+
+        List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions
+                = functionInstanceStatus.getLatestUserExceptions()
+                .stream()
+                .map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                        .setExceptionString(exceptionInformation.getExceptionString())
+                        .setMsSinceEpoch(exceptionInformation.getTimestampMs())
+                        .build())
+                .collect(Collectors.toList());
+
+
+        InstanceCommunication.FunctionStatus functionStatus = InstanceCommunication.FunctionStatus.newBuilder()
+                .setRunning(functionInstanceStatus.isRunning())
+                .setFailureException(functionInstanceStatus.getError())
+                .setNumRestarts(functionInstanceStatus.getNumRestarts())
+                .setNumSuccessfullyProcessed(functionInstanceStatus.getNumSuccessfullyProcessed())
+                .setNumUserExceptions(functionInstanceStatus.getNumUserExceptions())
+                .addAllLatestUserExceptions(latestUserExceptions)
+                .setNumSystemExceptions(functionInstanceStatus.getNumSystemExceptions())
+                .addAllLatestSystemExceptions(latestSysExceptions)
+                .setAverageLatency(functionInstanceStatus.getAverageLatency())
+                .setLastInvocationTime(functionInstanceStatus.getLastInvocationTime())
+                .setInstanceId(instanceId)
+                .setWorkerId(delegate.worker().getWorkerConfig().getWorkerId())
+                .build();
+
+        return functionStatus;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index b620fe5222..2894c051f8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -22,13 +22,12 @@
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -39,10 +38,9 @@
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -51,242 +49,271 @@
 @Path("/functions")
 public class FunctionApiV2Resource extends FunctionApiResource {
 
-    protected final FunctionsImpl functions;
+    protected final FunctionsImplV2 functions;
 
     public FunctionApiV2Resource() {
-        this.functions = new FunctionsImpl(this);
+        this.functions = new FunctionsImplV2(this);
     }
 
     @POST
+    @ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+    })
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void registerFunction(final @PathParam("tenant") String tenant,
-                                 final @PathParam("namespace") String namespace,
-                                 final @PathParam("functionName") String functionName,
-                                 final @FormDataParam("data") InputStream uploadedInputStream,
-                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                 final @FormDataParam("url") String functionPkgUrl,
-                                 final @FormDataParam("functionDetails") String functionDetailsJson,
-                                 final @FormDataParam("functionConfig") String functionConfigJson) {
+    public Response registerFunction(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @FormDataParam("data") InputStream uploadedInputStream,
+                                     final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                     final @FormDataParam("url") String functionPkgUrl,
+                                     final @FormDataParam("functionDetails") String functionDetailsJson,
+                                     final @FormDataParam("functionConfig") String functionConfigJson) {
 
-        functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+        return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
                 functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
-
     }
 
     @PUT
+    @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void updateFunction(final @PathParam("tenant") String tenant,
-                               final @PathParam("namespace") String namespace,
-                               final @PathParam("functionName") String functionName,
-                               final @FormDataParam("data") InputStream uploadedInputStream,
-                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                               final @FormDataParam("url") String functionPkgUrl,
-                               final @FormDataParam("functionDetails") String functionDetailsJson,
-                               final @FormDataParam("functionConfig") String functionConfigJson) {
+    public Response updateFunction(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("functionName") String functionName,
+                                   final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String functionPkgUrl,
+                                   final @FormDataParam("functionDetails") String functionDetailsJson,
+                                   final @FormDataParam("functionConfig") String functionConfigJson) {
 
-        functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+        return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
                 functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
-
     }
 
+
     @DELETE
+    @ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "The function was successfully deleted")
+    })
     @Path("/{tenant}/{namespace}/{functionName}")
-    public void deregisterFunction(final @PathParam("tenant") String tenant,
-                                   final @PathParam("namespace") String namespace,
-                                   final @PathParam("functionName") String functionName) {
-        functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+    public Response deregisterFunction(final @PathParam("tenant") String tenant,
+                                       final @PathParam("namespace") String namespace,
+                                       final @PathParam("functionName") String functionName) {
+        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
     }
 
     @GET
+    @ApiOperation(
+            value = "Fetches information about a Pulsar Function currently running in cluster mode",
+            response = Function.FunctionMetaData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
     @Path("/{tenant}/{namespace}/{functionName}")
-    public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
-                                          final @PathParam("namespace") String namespace,
-                                          final @PathParam("functionName") String functionName) {
-        return functions.getFunctionInfo(tenant, namespace, functionName);
+    public Response getFunctionInfo(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName) throws IOException {
+
+        return functions.getFunctionInfo(
+                tenant, namespace, functionName);
     }
 
     @GET
     @ApiOperation(
             value = "Displays the status of a Pulsar Function instance",
-            response = FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class
+            response = InstanceCommunication.FunctionStatus.class
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "The function doesn't exist")
     })
-    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
-    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
-            final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace,
-            final @PathParam("functionName") String functionName,
-            final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionInstanceStatus(
-                tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
+                                              final @PathParam("namespace") String namespace,
+                                              final @PathParam("functionName") String functionName,
+                                              final @PathParam("instanceId") String instanceId) throws IOException {
+
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @GET
     @ApiOperation(
-            value = "Displays the status of a Pulsar Function",
-            response = FunctionStatus.class
+            value = "Displays the status of a Pulsar Function running in cluster mode",
+            response = InstanceCommunication.FunctionStatus.class
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The function doesn't exist")
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
-    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/status")
-    public FunctionStatus getFunctionStatus(
-            final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace,
-            final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStatus(
+    public Response getFunctionStatus(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStatusV2(
                 tenant, namespace, functionName, uri.getRequestUri());
     }
 
     @GET
     @ApiOperation(
-            value = "Displays the stats of a Pulsar Function",
-            response = FunctionStats.class
+            value = "Lists all Pulsar Functions currently deployed in a given namespace",
+            response = String.class,
+            responseContainer = "Collection"
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The function doesn't exist")
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("/{tenant}/{namespace}/{functionName}/stats")
-    public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
-                                          final @PathParam("namespace") String namespace,
-                                          final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
+    @Path("/{tenant}/{namespace}")
+    public Response listFunctions(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace) {
+        return functions.listFunctions( tenant, namespace);
     }
 
-    @GET
+    @POST
     @ApiOperation(
-            value = "Displays the stats of a Pulsar Function instance",
-            response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
+            value = "Triggers a Pulsar Function with a user-specified value or file data",
+            response = Message.class
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The function doesn't exist")
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 500, message = "Internal server error")
     })
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
-    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
-            final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace,
-            final @PathParam("functionName") String functionName,
-            final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionsInstanceStats(
-                tenant, namespace, functionName, instanceId, uri.getRequestUri());
-    }
-
-    @POST
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public String triggerFunction(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String namespace,
-                                  final @PathParam("functionName") String functionName,
-                                  final @FormDataParam("data") String input,
-                                  final @FormDataParam("dataStream") InputStream uploadedInputStream,
-                                  final @FormDataParam("topic") String topic) {
-        return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
+    public Response triggerFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName,
+                                    final @FormDataParam("data") String triggerValue,
+                                    final @FormDataParam("dataStream") InputStream triggerStream,
+                                    final @FormDataParam("topic") String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
     }
 
-    @POST
-    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @GET
+    @ApiOperation(
+            value = "Fetch the current state associated with a Pulsar Function",
+            response = String.class
+    )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The key does not exist"),
             @ApiResponse(code = 500, message = "Internal server error")
     })
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public Response getFunctionState(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @PathParam("key") String key) {
+        return functions.getFunctionState(tenant, namespace, functionName, key);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void restartFunction(final @PathParam("tenant") String tenant,
-                                final @PathParam("namespace") String namespace,
-                                final @PathParam("functionName") String functionName,
-                                final @PathParam("instanceId") String instanceId) {
-        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+                                    final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Restart all function instances", response = Void.class)
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
-    })
+            @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{functionName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void restartFunction(final @PathParam("tenant") String tenant,
-                                final @PathParam("namespace") String namespace,
-                                final @PathParam("functionName") String functionName) {
-        functions.restartFunctionInstances(tenant, namespace, functionName);
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+        return functions.restartFunctionInstances(tenant, namespace, functionName);
     }
 
     @POST
     @ApiOperation(value = "Stop function instance", response = Void.class)
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
-    })
+            @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void stopFunction(final @PathParam("tenant") String tenant,
-                             final @PathParam("namespace") String namespace,
-                             final @PathParam("functionName") String functionName,
-                             final @PathParam("instanceId") String instanceId) {
-        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+                                 final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Stop all function instances", response = Void.class)
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
-    })
+            @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{functionName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void stopFunction(final @PathParam("tenant") String tenant,
-                             final @PathParam("namespace") String namespace,
-                             final @PathParam("functionName") String functionName) {
-        functions.stopFunctionInstances(tenant, namespace, functionName);
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+        return functions.stopFunctionInstances(tenant, namespace, functionName);
     }
 
     @POST
+    @ApiOperation(
+            value = "Uploads Pulsar Function file data",
+            hidden = true
+    )
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
-                               final @FormDataParam("path") String path) {
-        functions.uploadFunction(uploadedInputStream, path);
+    public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("path") String path) {
+        return functions.uploadFunction(uploadedInputStream, path);
     }
 
     @GET
+    @ApiOperation(
+            value = "Downloads Pulsar Function file data",
+            hidden = true
+    )
     @Path("/download")
-    public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
+    public Response downloadFunction(final @QueryParam("path") String path) {
         return functions.downloadFunction(path);
     }
 
     @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
     @Path("/connectors")
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
-
-    @GET
-    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
-    public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
-                                          final @PathParam("namespace") String namespace,
-                                          final @PathParam("functionName") String functionName,
-                                          final @PathParam("key") String key) throws IOException {
-        return functions.getFunctionState(tenant, namespace, functionName, key);
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
new file mode 100644
index 0000000000..d25f8f9405
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v3;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+@Slf4j
+@Path("/functions")
+public class FunctionApiV3Resource extends FunctionApiResource {
+
+    protected final FunctionsImpl functions;
+
+    public FunctionApiV3Resource() {
+        this.functions = new FunctionsImpl(this);
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public void registerFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace,
+                                 final @PathParam("functionName") String functionName,
+                                 final @FormDataParam("data") InputStream uploadedInputStream,
+                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String functionPkgUrl,
+                                 final @FormDataParam("functionDetails") String functionDetailsJson,
+                                 final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+
+    }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public void updateFunction(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("functionName") String functionName,
+                               final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                               final @FormDataParam("url") String functionPkgUrl,
+                               final @FormDataParam("functionDetails") String functionDetailsJson,
+                               final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{functionName}")
+    public void deregisterFunction(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("functionName") String functionName) {
+        functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{functionName}")
+    public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("functionName") String functionName) {
+        return functions.getFunctionInfo(tenant, namespace, functionName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Function instance",
+            response = FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
+    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
+            final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace,
+            final @PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+                tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Function",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{tenant}/{namespace}/{functionName}/status")
+    public FunctionStatus getFunctionStatus(
+            final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace,
+            final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStatus(
+                tenant, namespace, functionName, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the stats of a Pulsar Function",
+            response = FunctionStats.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{tenant}/{namespace}/{functionName}/stats")
+    public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the stats of a Pulsar Function instance",
+            response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
+            final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace,
+            final @PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionsInstanceStats(
+                tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{functionName}/trigger")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public String triggerFunction(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("functionName") String functionName,
+                                  final @FormDataParam("data") String input,
+                                  final @FormDataParam("dataStream") InputStream uploadedInputStream,
+                                  final @FormDataParam("topic") String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void restartFunction(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("functionName") String functionName,
+                                final @PathParam("instanceId") String instanceId) {
+        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all function instances", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void restartFunction(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("functionName") String functionName) {
+        functions.restartFunctionInstances(tenant, namespace, functionName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop function instance", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void stopFunction(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("functionName") String functionName,
+                             final @PathParam("instanceId") String instanceId) {
+        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all function instances", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void stopFunction(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("functionName") String functionName) {
+        functions.stopFunctionInstances(tenant, namespace, functionName);
+    }
+
+    @POST
+    @Path("/upload")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("path") String path) {
+        functions.uploadFunction(uploadedInputStream, path);
+    }
+
+    @GET
+    @Path("/download")
+    public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
+        return functions.downloadFunction(path);
+    }
+
+    @GET
+    @Path("/connectors")
+    public List<ConnectorDefinition> getConnectorsList() throws IOException {
+        return functions.getListOfConnectors();
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("functionName") String functionName,
+                                          final @PathParam("key") String key) throws IOException {
+        return functions.getFunctionState(tenant, namespace, functionName, key);
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
similarity index 98%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index e630db4668..6a8f25e977 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -42,11 +42,11 @@
 
 @Slf4j
 @Path("/sink")
-public class SinkApiV2Resource extends FunctionApiResource {
+public class SinkApiV3Resource extends FunctionApiResource {
 
     protected final SinkImpl sink;
 
-    public SinkApiV2Resource() {
+    public SinkApiV3Resource() {
         this.sink = new SinkImpl(this);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
similarity index 98%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index bb49fe1995..8675cc5888 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -41,11 +41,11 @@
 
 @Slf4j
 @Path("/source")
-public class SourceApiV2Resource extends FunctionApiResource {
+public class SourceApiV3Resource extends FunctionApiResource {
 
     protected final SourceImpl source;
 
-    public SourceApiV2Resource() {
+    public SourceApiV3Resource() {
         this.source = new SourceImpl(this);
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 25a3a6604f..aa22650417 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -52,6 +52,7 @@
 import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -75,6 +76,7 @@
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.pulsar.functions.utils.Utils.mergeJson;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -131,7 +133,7 @@ public String process(String input, Context context) {
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
     private Namespace mockedNamespace;
-    private FunctionsImpl resource;
+    private FunctionsImplV2 resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetadata;
@@ -167,16 +169,20 @@ public void setup() throws Exception {
 
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
-            .setWorkerId("test")
-            .setWorkerPort(8080)
-            .setDownloadDirectory("/tmp/pulsar/functions")
-            .setFunctionMetadataTopicName("pulsar/functions")
-            .setNumFunctionPackageReplicas(3)
-            .setPulsarServiceUrl("pulsar://localhost:6650/");
+                .setWorkerId("test")
+                .setWorkerPort(8080)
+                .setDownloadDirectory("/tmp/pulsar/functions")
+                .setFunctionMetadataTopicName("pulsar/functions")
+                .setNumFunctionPackageReplicas(3)
+                .setPulsarServiceUrl("pulsar://localhost:6650/");
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
-        this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+        FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService));
+
+        doReturn(ComponentImpl.ComponentType.FUNCTION).when(functions).calculateSubjectType(any());
+
+        this.resource = spy(new FunctionsImplV2(functions));
+
     }
 
     //
@@ -208,16 +214,16 @@ public void testRegisterFunctionMissingTenant() {
     public void testRegisterFunctionMissingNamespace() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                null,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    null,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     null);
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -228,22 +234,22 @@ public void testRegisterFunctionMissingNamespace() {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
     public void testRegisterFunctionMissingFunctionName() {
         try {
-        testRegisterFunctionMissingArguments(
-            tenant,
-            namespace,
-            null,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                null);
-    } catch (RestException re){
-        assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
-        throw re;
-    }
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    null,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
@@ -292,16 +298,16 @@ public void testRegisterFunctionMissingInputTopics() {
     public void testRegisterFunctionMissingPackageDetails() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                null,
-                outputTopic,
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    null,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     null);
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -355,17 +361,17 @@ public void testRegisterFunctionWrongClassName() {
     public void testRegisterFunctionWrongParallelism() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                className,
-                -2,
-                null);
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    -2,
+                    null);
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -398,17 +404,17 @@ public void testRegisterFunctionSameInputOutput() {
     public void testRegisterFunctionWrongOutputTopic() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                function + "-output-topic/test:",
-                outputSerdeClassName,
-                className,
-                parallelism,
-                null);
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    function + "-output-topic/test:",
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -419,17 +425,17 @@ public void testRegisterFunctionWrongOutputTopic() {
     public void testRegisterFunctionHttpUrl() {
         try {
             testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                null,
-                outputTopic,
-                outputSerdeClassName,
-                className,
-                parallelism,
-                "http://localhost:1234/test");
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    null,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    "http://localhost:1234/test");
         } catch (RestException re){
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -491,14 +497,14 @@ private void testRegisterFunctionMissingArguments(
     private void registerDefaultFunction() {
         FunctionConfig functionConfig = createDefaultFunctionConfig();
         resource.registerFunction(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            mockedFormData,
-            null,
-            null,
-            new Gson().toJson(functionConfig),
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                mockedFormData,
+                null,
+                null,
+                new Gson().toJson(functionConfig),
                 null);
     }
 
@@ -523,8 +529,8 @@ public void testRegisterFunctionUploadFailure() throws Exception {
 
             Utils.uploadFileToBookkeeper(
                     anyString(),
-                any(File.class),
-                any(Namespace.class));
+                    any(File.class),
+                    any(Namespace.class));
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
             registerDefaultFunction();
@@ -540,15 +546,15 @@ public void testRegisterFunctionSuccess() throws Exception {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
             RequestResult rr = new RequestResult()
-                .setSuccess(true)
-                .setMessage("function registered");
+                    .setSuccess(true)
+                    .setMessage("function registered");
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -587,15 +593,15 @@ public void testRegisterFunctionFailure() throws Exception {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
             RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to register");
+                    .setSuccess(false)
+                    .setMessage("function failed to register");
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -612,14 +618,14 @@ public void testRegisterFunctionInterrupted() throws Exception {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
             CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registeration interrupted"));
+                    new IOException("Function registeration interrupted"));
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
             registerDefaultFunction();
@@ -637,16 +643,16 @@ public void testRegisterFunctionInterrupted() throws Exception {
     public void testUpdateFunctionMissingTenant() {
         try {
             testUpdateFunctionMissingArguments(
-                null,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    null,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     "Tenant is not provided");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -658,16 +664,16 @@ public void testUpdateFunctionMissingTenant() {
     public void testUpdateFunctionMissingNamespace() {
         try {
             testUpdateFunctionMissingArguments(
-                tenant,
-                null,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    null,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     "Namespace is not provided");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -679,16 +685,16 @@ public void testUpdateFunctionMissingNamespace() {
     public void testUpdateFunctionMissingFunctionName() {
         try {
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                null,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    namespace,
+                    null,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     "Function Name is not provided");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -703,16 +709,16 @@ public void testUpdateFunctionMissingPackage() throws IOException {
             doNothing().when(Utils.class);
             Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                className,
-                parallelism,
+                    className,
+                    parallelism,
                     "Update contains no change");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -751,16 +757,16 @@ public void testUpdateFunctionMissingClassName() throws IOException {
             doNothing().when(Utils.class);
             Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
                     outputSerdeClassName,
-                null,
-                parallelism,
+                    null,
+                    parallelism,
                     "Update contains no change");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -775,17 +781,17 @@ public void testUpdateFunctionChangedParallelism() throws IOException {
             doNothing().when(Utils.class);
             Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                null,
-                parallelism + 1,
-                null);
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    null,
+                    parallelism + 1,
+                    null);
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -799,17 +805,17 @@ public void testUpdateFunctionChangedInputs() throws IOException {
             doNothing().when(Utils.class);
             Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                "DifferentOutput",
-                outputSerdeClassName,
-                null,
-                parallelism,
-                "Output topics differ");
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    "DifferentOutput",
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    "Output topics differ");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -825,17 +831,17 @@ public void testUpdateFunctionChangedOutput() throws IOException {
             Map<String, String> someOtherInput = new HashMap<>();
             someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
             testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                someOtherInput,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                null,
-                parallelism,
-                "Input Topics cannot be altered");
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    someOtherInput,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    "Input Topics cannot be altered");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
             throw re;
@@ -892,14 +898,14 @@ private void testUpdateFunctionMissingArguments(
         }
 
         resource.updateFunction(
-            tenant,
-            namespace,
-            function,
-            inputStream,
-            details,
-            null,
-            null,
-            new Gson().toJson(functionConfig),
+                tenant,
+                namespace,
+                function,
+                inputStream,
+                details,
+                null,
+                null,
+                new Gson().toJson(functionConfig),
                 null);
 
     }
@@ -917,14 +923,14 @@ private void updateDefaultFunction() {
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
 
         resource.updateFunction(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            mockedFormData,
-            null,
-            null,
-            new Gson().toJson(functionConfig),
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                mockedFormData,
+                null,
+                null,
+                new Gson().toJson(functionConfig),
                 null);
     }
 
@@ -946,8 +952,8 @@ public void testUpdateFunctionUploadFailure() throws Exception {
             doThrow(new IOException("upload failure")).when(Utils.class);
             Utils.uploadFileToBookkeeper(
                     anyString(),
-                any(File.class),
-                any(Namespace.class));
+                    any(File.class),
+                    any(Namespace.class));
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
@@ -963,15 +969,15 @@ public void testUpdateFunctionSuccess() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
         Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("function registered");
+                .setSuccess(true)
+                .setMessage("function registered");
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -1000,18 +1006,18 @@ public void testUpdateFunctionWithUrl() {
         RequestResult rr = new RequestResult()
                 .setSuccess(true)
                 .setMessage("function registered");
-            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
         resource.updateFunction(
-            tenant,
-            namespace,
-            function,
-            null,
-            null,
-            filePackageUrl,
-            null,
-            new Gson().toJson(functionConfig),
+                tenant,
+                namespace,
+                function,
+                null,
+                null,
+                filePackageUrl,
+                null,
+                new Gson().toJson(functionConfig),
                 null);
 
     }
@@ -1022,15 +1028,15 @@ public void testUpdateFunctionFailure() throws Exception {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
             RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to register");
+                    .setSuccess(false)
+                    .setMessage("function failed to register");
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -1047,14 +1053,14 @@ public void testUpdateFunctionInterrupted() throws Exception {
             mockStatic(Utils.class);
             doNothing().when(Utils.class);
             Utils.uploadToBookeeper(
-                any(Namespace.class),
-                any(InputStream.class),
-                anyString());
+                    any(Namespace.class),
+                    any(InputStream.class),
+                    anyString());
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
             CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-                new IOException("Function registeration interrupted"));
+                    new IOException("Function registeration interrupted"));
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
             updateDefaultFunction();
@@ -1073,9 +1079,9 @@ public void testDeregisterFunctionMissingTenant() {
         try {
 
             testDeregisterFunctionMissingArguments(
-                null,
-                namespace,
-                function
+                    null,
+                    namespace,
+                    function
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1087,9 +1093,9 @@ public void testDeregisterFunctionMissingTenant() {
     public void testDeregisterFunctionMissingNamespace() {
         try {
             testDeregisterFunctionMissingArguments(
-                tenant,
-                null,
-                function
+                    tenant,
+                    null,
+                    function
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1100,10 +1106,10 @@ public void testDeregisterFunctionMissingNamespace() {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
     public void testDeregisterFunctionMissingFunctionName() {
         try {
-             testDeregisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                null
+            testDeregisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    null
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1117,17 +1123,17 @@ private void testDeregisterFunctionMissingArguments(
             String function
     ) {
         resource.deregisterFunction(
-            tenant,
-            namespace,
-            function,
+                tenant,
+                namespace,
+                function,
                 null);
     }
 
     private void deregisterDefaultFunction() {
         resource.deregisterFunction(
-            tenant,
-            namespace,
-            function,
+                tenant,
+                namespace,
+                function,
                 null);
     }
 
@@ -1147,8 +1153,8 @@ public void testDeregisterFunctionSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("function deregistered");
+                .setSuccess(true)
+                .setMessage("function deregistered");
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
 
@@ -1161,8 +1167,8 @@ public void testDeregisterFunctionFailure() {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
             RequestResult rr = new RequestResult()
-                .setSuccess(false)
-                .setMessage("function failed to deregister");
+                    .setSuccess(false)
+                    .setMessage("function failed to deregister");
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
 
@@ -1195,12 +1201,12 @@ public void testDeregisterFunctionInterrupted() {
     //
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
-    public void testGetFunctionMissingTenant() {
+    public void testGetFunctionMissingTenant() throws IOException {
         try {
             testGetFunctionMissingArguments(
-                null,
-                namespace,
-                function
+                    null,
+                    namespace,
+                    function
             );
         }
         catch (RestException re) {
@@ -1210,12 +1216,12 @@ public void testGetFunctionMissingTenant() {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
-    public void testGetFunctionMissingNamespace() {
+    public void testGetFunctionMissingNamespace() throws IOException {
         try {
             testGetFunctionMissingArguments(
-                tenant,
-                null,
-                function
+                    tenant,
+                    null,
+                    function
             );
         }
         catch (RestException re) {
@@ -1225,12 +1231,12 @@ public void testGetFunctionMissingNamespace() {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
-    public void testGetFunctionMissingFunctionName() {
+    public void testGetFunctionMissingFunctionName() throws IOException {
         try {
             testGetFunctionMissingArguments(
-                tenant,
-                namespace,
-                null
+                    tenant,
+                    namespace,
+                    null
             );
         }
         catch (RestException re) {
@@ -1243,25 +1249,28 @@ private void testGetFunctionMissingArguments(
             String tenant,
             String namespace,
             String function
-    ) {
+    ) throws IOException {
         resource.getFunctionInfo(
-            tenant,
-            namespace,
-            function
+                tenant,
+                namespace,
+                function
         );
 
     }
 
-    private FunctionConfig getDefaultFunctionInfo() {
-        return resource.getFunctionInfo(
-            tenant,
-            namespace,
-            function
-        );
+    private FunctionDetails getDefaultFunctionInfo() throws IOException {
+        String json = (String) resource.getFunctionInfo(
+                tenant,
+                namespace,
+                function
+        ).getEntity();
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        mergeJson(json, functionDetailsBuilder);
+        return functionDetailsBuilder.build();
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
-    public void testGetNotExistedFunction() {
+    public void testGetNotExistedFunction() throws IOException {
         try {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
             getDefaultFunctionInfo();
@@ -1272,7 +1281,7 @@ public void testGetNotExistedFunction() {
     }
 
     @Test
-    public void testGetFunctionSuccess() {
+    public void testGetFunctionSuccess() throws IOException {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         SinkSpec sinkSpec = SinkSpec.newBuilder()
@@ -1289,17 +1298,17 @@ public void testGetFunctionSuccess() {
                 .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
                         .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
-            .setCreateTime(System.currentTimeMillis())
-            .setFunctionDetails(functionDetails)
-            .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
-            .setVersion(1234)
-            .build();
+                .setCreateTime(System.currentTimeMillis())
+                .setFunctionDetails(functionDetails)
+                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+                .setVersion(1234)
+                .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
 
-        FunctionConfig functionConfig = getDefaultFunctionInfo();
+        FunctionDetails actual = getDefaultFunctionInfo();
         assertEquals(
-                FunctionConfigUtils.convertFromDetails(functionDetails),
-                functionConfig);
+                functionDetails,
+                actual);
     }
 
     //
@@ -1310,8 +1319,8 @@ public void testGetFunctionSuccess() {
     public void testListFunctionsMissingTenant() {
         try {
             testListFunctionsMissingArguments(
-                null,
-                namespace
+                    null,
+                    namespace
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1323,8 +1332,8 @@ public void testListFunctionsMissingTenant() {
     public void testListFunctionsMissingNamespace() {
         try {
             testListFunctionsMissingArguments(
-                tenant,
-                null
+                    tenant,
+                    null
             );
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1337,17 +1346,17 @@ private void testListFunctionsMissingArguments(
             String namespace
     ) {
         resource.listFunctions(
-            tenant,
-            namespace
+                tenant,
+                namespace
         );
 
     }
 
     private List<String> listDefaultFunctions() {
-        return resource.listFunctions(
-            tenant,
-            namespace
-        );
+        return new Gson().fromJson((String) resource.listFunctions(
+                tenant,
+                namespace
+        ).getEntity(), List.class);
     }
 
     @Test
@@ -1368,34 +1377,12 @@ public void testListFunctionsSuccess() {
         assertEquals(functions, functionList);
     }
 
-    @Test
-    public void testOnlyGetSources() {
-        List<String> functions = Lists.newArrayList("test-2");
-        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
-        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
-                FunctionDetails.newBuilder().setName("test-1").build()).build();
-        functionMetaDataList.add(f1);
-        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
-                FunctionDetails.newBuilder().setName("test-2").build()).build();
-        functionMetaDataList.add(f2);
-        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
-                FunctionDetails.newBuilder().setName("test-3").build()).build();
-        functionMetaDataList.add(f3);
-        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
-        doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
-
-        List<String> functionList = listDefaultFunctions();
-        assertEquals(functions, functionList);
-    }
-
     @Test
     public void testDownloadFunctionHttpUrl() throws Exception {
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
         String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionsImpl function = new FunctionsImpl(null);
-        StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
+        FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
+        StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl).getEntity();
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
@@ -1409,8 +1396,8 @@ public void testDownloadFunctionHttpUrl() throws Exception {
     public void testDownloadFunctionFile() throws Exception {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionsImpl function = new FunctionsImpl(null);
-        StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
+        FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
+        StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation).getEntity();
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
new file mode 100644
index 0000000000..580345e3c1
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -0,0 +1,1500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v3;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SubscriptionType;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.rest.RestException;
+import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link FunctionApiV2Resource}.
+ */
+@PrepareForTest(Utils.class)
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
+@Slf4j
+public class FunctionApiV3ResourceTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    private static final class TestFunction implements Function<String, String> {
+
+        @Override
+        public String process(String input, Context context) {
+            return input;
+        }
+    }
+
+    private static final String tenant = "test-tenant";
+    private static final String namespace = "test-namespace";
+    private static final String function = "test-function";
+    private static final String outputTopic = "test-output-topic";
+    private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE;
+    private static final String className = TestFunction.class.getName();
+    private SubscriptionType subscriptionType = SubscriptionType.FAILOVER;
+    private static final Map<String, String> topicsToSerDeClassName = new HashMap<>();
+    static {
+        topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE);
+    }
+    private static final int parallelism = 1;
+
+    private WorkerService mockedWorkerService;
+    private PulsarAdmin mockedPulsarAdmin;
+    private Tenants mockedTenants;
+    private Namespaces mockedNamespaces;
+    private TenantInfo mockedTenantInfo;
+    private List<String> namespaceList = new LinkedList<>();
+    private FunctionMetaDataManager mockedManager;
+    private FunctionRuntimeManager mockedFunctionRunTimeManager;
+    private RuntimeFactory mockedRuntimeFactory;
+    private Namespace mockedNamespace;
+    private FunctionsImpl resource;
+    private InputStream mockedInputStream;
+    private FormDataContentDisposition mockedFormData;
+    private FunctionMetaData mockedFunctionMetadata;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        this.mockedManager = mock(FunctionMetaDataManager.class);
+        this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
+        this.mockedTenantInfo = mock(TenantInfo.class);
+        this.mockedRuntimeFactory = mock(RuntimeFactory.class);
+        this.mockedInputStream = mock(InputStream.class);
+        this.mockedNamespace = mock(Namespace.class);
+        this.mockedFormData = mock(FormDataContentDisposition.class);
+        when(mockedFormData.getFileName()).thenReturn("test");
+        this.mockedPulsarAdmin = mock(PulsarAdmin.class);
+        this.mockedTenants = mock(Tenants.class);
+        this.mockedNamespaces = mock(Namespaces.class);
+        this.mockedFunctionMetadata = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        namespaceList.add(tenant + "/" + namespace);
+
+        this.mockedWorkerService = mock(WorkerService.class);
+        when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+        when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
+        when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
+        when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
+        when(mockedWorkerService.isInitialized()).thenReturn(true);
+        when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
+        when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
+        when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+        when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata);
+
+        // worker config
+        WorkerConfig workerConfig = new WorkerConfig()
+            .setWorkerId("test")
+            .setWorkerPort(8080)
+            .setDownloadDirectory("/tmp/pulsar/functions")
+            .setFunctionMetadataTopicName("pulsar/functions")
+            .setNumFunctionPackageReplicas(3)
+            .setPulsarServiceUrl("pulsar://localhost:6650/");
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+
+        this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
+        doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+    }
+
+    //
+    // Register Functions
+    //
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testRegisterFunctionMissingTenant() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    null,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testRegisterFunctionMissingNamespace() {
+        try {
+            testRegisterFunctionMissingArguments(
+                tenant,
+                null,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
+    public void testRegisterFunctionMissingFunctionName() {
+        try {
+        testRegisterFunctionMissingArguments(
+            tenant,
+            namespace,
+            null,
+            mockedInputStream,
+            topicsToSerDeClassName,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                null);
+    } catch (RestException re){
+        assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+        throw re;
+    }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
+    public void testRegisterFunctionMissingPackage() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "No input topic\\(s\\) specified for the function")
+    public void testRegisterFunctionMissingInputTopics() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    null,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
+    public void testRegisterFunctionMissingPackageDetails() {
+        try {
+            testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                null,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function classname cannot be null")
+    public void testRegisterFunctionMissingClassName() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "User class must be in class path")
+    public void testRegisterFunctionWrongClassName() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    "UnknownClass",
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function parallelism should positive number")
+    public void testRegisterFunctionWrongParallelism() {
+        try {
+            testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                -2,
+                null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class,
+            expectedExceptionsMessageRegExp = "Output topic persistent://sample/standalone/ns1/test_src is also being used as an input topic \\(topics must be one or the other\\)")
+    public void testRegisterFunctionSameInputOutput() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    topicsToSerDeClassName.keySet().iterator().next(),
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topic " + function + "-output-topic/test:" + " is invalid")
+    public void testRegisterFunctionWrongOutputTopic() {
+        try {
+            testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                function + "-output-topic/test:",
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Corrupted Jar File")
+    public void testRegisterFunctionHttpUrl() {
+        try {
+            testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                null,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "http://localhost:1234/test");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    private void testRegisterFunctionMissingArguments(
+            String tenant,
+            String namespace,
+            String function,
+            InputStream inputStream,
+            Map<String, String> topicsToSerDeClassName,
+            FormDataContentDisposition details,
+            String outputTopic,
+            String outputSerdeClassName,
+            String className,
+            Integer parallelism,
+            String functionPkgUrl) {
+        FunctionConfig functionConfig = new FunctionConfig();
+        if (tenant != null) {
+            functionConfig.setTenant(tenant);
+        }
+        if (namespace != null) {
+            functionConfig.setNamespace(namespace);
+        }
+        if (function != null) {
+            functionConfig.setName(function);
+        }
+        if (topicsToSerDeClassName != null) {
+            functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        }
+        if (outputTopic != null) {
+            functionConfig.setOutput(outputTopic);
+        }
+        if (outputSerdeClassName != null) {
+            functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        }
+        if (className != null) {
+            functionConfig.setClassName(className);
+        }
+        if (parallelism != null) {
+            functionConfig.setParallelism(parallelism);
+        }
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+
+        resource.registerFunction(
+                tenant,
+                namespace,
+                function,
+                inputStream,
+                details,
+                functionPkgUrl,
+                null,
+                new Gson().toJson(functionConfig),
+                null);
+
+    }
+
+    private void registerDefaultFunction() {
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
+        resource.registerFunction(
+            tenant,
+            namespace,
+            function,
+            mockedInputStream,
+            mockedFormData,
+            null,
+            null,
+            new Gson().toJson(functionConfig),
+                null);
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function already exists")
+    public void testRegisterExistedFunction() {
+        try {
+            Configurator.setRootLevel(Level.DEBUG);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
+    public void testRegisterFunctionUploadFailure() throws Exception {
+        try {
+            mockStatic(Utils.class);
+            doThrow(new IOException("upload failure")).when(Utils.class);
+
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                any(File.class),
+                any(Namespace.class));
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
+    }
+
+    @Test
+    public void testRegisterFunctionSuccess() throws Exception {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+            RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("function registered");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace does not exist")
+    public void testRegisterFunctionNonExistingNamespace() {
+        try {
+            this.namespaceList.clear();
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant does not exist")
+    public void testRegisterFunctionNonexistantTenant() throws Exception {
+        try {
+            when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
+    public void testRegisterFunctionFailure() throws Exception {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("function failed to register");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+    public void testRegisterFunctionInterrupted() throws Exception {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function registeration interrupted"));
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
+    }
+
+    //
+    // Update Functions
+    //
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testUpdateFunctionMissingTenant() {
+        try {
+            testUpdateFunctionMissingArguments(
+                null,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Tenant is not provided");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testUpdateFunctionMissingNamespace() {
+        try {
+            testUpdateFunctionMissingArguments(
+                tenant,
+                null,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Namespace is not provided");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
+    public void testUpdateFunctionMissingFunctionName() {
+        try {
+            testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                null,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Function Name is not provided");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
+    public void testUpdateFunctionMissingPackage() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Update contains no change");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
+    public void testUpdateFunctionMissingInputTopic() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    null,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    "Update contains no change");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
+    public void testUpdateFunctionMissingClassName() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                null,
+                parallelism,
+                    "Update contains no change");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test
+    public void testUpdateFunctionChangedParallelism() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                null,
+                parallelism + 1,
+                null);
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topics differ")
+    public void testUpdateFunctionChangedInputs() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                "DifferentOutput",
+                outputSerdeClassName,
+                null,
+                parallelism,
+                "Output topics differ");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
+    public void testUpdateFunctionChangedOutput() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            Map<String, String> someOtherInput = new HashMap<>();
+            someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
+            testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                someOtherInput,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                null,
+                parallelism,
+                "Input Topics cannot be altered");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    private void testUpdateFunctionMissingArguments(
+            String tenant,
+            String namespace,
+            String function,
+            InputStream inputStream,
+            Map<String, String> topicsToSerDeClassName,
+            FormDataContentDisposition details,
+            String outputTopic,
+            String outputSerdeClassName,
+            String className,
+            Integer parallelism,
+            String expectedError) {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        if (tenant != null) {
+            functionConfig.setTenant(tenant);
+        }
+        if (namespace != null) {
+            functionConfig.setNamespace(namespace);
+        }
+        if (function != null) {
+            functionConfig.setName(function);
+        }
+        if (topicsToSerDeClassName != null) {
+            functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        }
+        if (outputTopic != null) {
+            functionConfig.setOutput(outputTopic);
+        }
+        if (outputSerdeClassName != null) {
+            functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        }
+        if (className != null) {
+            functionConfig.setClassName(className);
+        }
+        if (parallelism != null) {
+            functionConfig.setParallelism(parallelism);
+        }
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+
+        if (expectedError == null) {
+            RequestResult rr = new RequestResult()
+                    .setSuccess(true)
+                    .setMessage("function registered");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        }
+
+        resource.updateFunction(
+            tenant,
+            namespace,
+            function,
+            inputStream,
+            details,
+            null,
+            null,
+            new Gson().toJson(functionConfig),
+                null);
+
+    }
+
+    private void updateDefaultFunction() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+
+        resource.updateFunction(
+            tenant,
+            namespace,
+            function,
+            mockedInputStream,
+            mockedFormData,
+            null,
+            null,
+            new Gson().toJson(functionConfig),
+                null);
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
+    public void testUpdateNotExistedFunction() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+            updateDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
+    public void testUpdateFunctionUploadFailure() throws Exception {
+        try {
+            mockStatic(Utils.class);
+            doThrow(new IOException("upload failure")).when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                any(File.class),
+                any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+            updateDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
+    }
+
+    @Test
+    public void testUpdateFunctionSuccess() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(true)
+            .setMessage("function registered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        updateDefaultFunction();
+    }
+
+    @Test
+    public void testUpdateFunctionWithUrl() {
+        Configurator.setRootLevel(Level.DEBUG);
+
+        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String filePackageUrl = "file://" + fileLocation;
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+        RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("function registered");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        resource.updateFunction(
+            tenant,
+            namespace,
+            function,
+            null,
+            null,
+            filePackageUrl,
+            null,
+            new Gson().toJson(functionConfig),
+                null);
+
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
+    public void testUpdateFunctionFailure() throws Exception {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("function failed to register");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+            updateDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+    public void testUpdateFunctionInterrupted() throws Exception {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function registeration interrupted"));
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+            updateDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
+    }
+
+    //
+    // deregister function
+    //
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testDeregisterFunctionMissingTenant() {
+        try {
+
+            testDeregisterFunctionMissingArguments(
+                null,
+                namespace,
+                function
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testDeregisterFunctionMissingNamespace() {
+        try {
+            testDeregisterFunctionMissingArguments(
+                tenant,
+                null,
+                function
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
+    public void testDeregisterFunctionMissingFunctionName() {
+        try {
+             testDeregisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                null
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    private void testDeregisterFunctionMissingArguments(
+            String tenant,
+            String namespace,
+            String function
+    ) {
+        resource.deregisterFunction(
+            tenant,
+            namespace,
+            function,
+                null);
+    }
+
+    private void deregisterDefaultFunction() {
+        resource.deregisterFunction(
+            tenant,
+            namespace,
+            function,
+                null);
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
+    public void testDeregisterNotExistedFunction() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+            deregisterDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
+            throw re;
+        }
+    }
+
+    @Test
+    public void testDeregisterFunctionSuccess() {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(true)
+            .setMessage("function deregistered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+
+        deregisterDefaultFunction();
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to deregister")
+    public void testDeregisterFunctionFailure() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("function failed to deregister");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+
+            deregisterDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregisteration interrupted")
+    public void testDeregisterFunctionInterrupted() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                    new IOException("Function deregisteration interrupted"));
+            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+
+            deregisterDefaultFunction();
+        }
+        catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
+    }
+
+    //
+    // Get Function Info
+    //
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testGetFunctionMissingTenant() {
+        try {
+            testGetFunctionMissingArguments(
+                null,
+                namespace,
+                function
+            );
+        }
+        catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testGetFunctionMissingNamespace() {
+        try {
+            testGetFunctionMissingArguments(
+                tenant,
+                null,
+                function
+            );
+        }
+        catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
+    public void testGetFunctionMissingFunctionName() {
+        try {
+            testGetFunctionMissingArguments(
+                tenant,
+                namespace,
+                null
+            );
+        }
+        catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    private void testGetFunctionMissingArguments(
+            String tenant,
+            String namespace,
+            String function
+    ) {
+        resource.getFunctionInfo(
+            tenant,
+            namespace,
+            function
+        );
+
+    }
+
+    private FunctionConfig getDefaultFunctionInfo() {
+        return resource.getFunctionInfo(
+            tenant,
+            namespace,
+            function
+        );
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
+    public void testGetNotExistedFunction() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+            getDefaultFunctionInfo();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
+            throw re;
+        }
+    }
+
+    @Test
+    public void testGetFunctionSuccess() {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+        SinkSpec sinkSpec = SinkSpec.newBuilder()
+                .setTopic(outputTopic)
+                .setSerDeClassName(outputSerdeClassName).build();
+        FunctionDetails functionDetails = FunctionDetails.newBuilder()
+                .setClassName(className)
+                .setSink(sinkSpec)
+                .setName(function)
+                .setNamespace(namespace)
+                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+                .setTenant(tenant)
+                .setParallelism(parallelism)
+                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
+                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+        FunctionMetaData metaData = FunctionMetaData.newBuilder()
+            .setCreateTime(System.currentTimeMillis())
+            .setFunctionDetails(functionDetails)
+            .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+            .setVersion(1234)
+            .build();
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+
+        FunctionConfig functionConfig = getDefaultFunctionInfo();
+        assertEquals(
+                FunctionConfigUtils.convertFromDetails(functionDetails),
+                functionConfig);
+    }
+
+    //
+    // List Functions
+    //
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testListFunctionsMissingTenant() {
+        try {
+            testListFunctionsMissingArguments(
+                null,
+                namespace
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testListFunctionsMissingNamespace() {
+        try {
+            testListFunctionsMissingArguments(
+                tenant,
+                null
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    private void testListFunctionsMissingArguments(
+            String tenant,
+            String namespace
+    ) {
+        resource.listFunctions(
+            tenant,
+            namespace
+        );
+
+    }
+
+    private List<String> listDefaultFunctions() {
+        return resource.listFunctions(
+            tenant,
+            namespace
+        );
+    }
+
+    @Test
+    public void testListFunctionsSuccess() {
+        final List<String> functions = Lists.newArrayList("test-1", "test-2");
+        final List<FunctionMetaData> metaDataList = new LinkedList<>();
+        FunctionMetaData functionMetaData1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()
+        ).build();
+        FunctionMetaData functionMetaData2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()
+        ).build();
+        metaDataList.add(functionMetaData1);
+        metaDataList.add(functionMetaData2);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(metaDataList);
+
+        List<String> functionList = listDefaultFunctions();
+        assertEquals(functions, functionList);
+    }
+
+    @Test
+    public void testOnlyGetSources() {
+        List<String> functions = Lists.newArrayList("test-2");
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()).build();
+        functionMetaDataList.add(f1);
+        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()).build();
+        functionMetaDataList.add(f2);
+        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-3").build()).build();
+        functionMetaDataList.add(f3);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+        doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+
+        List<String> functionList = listDefaultFunctions();
+        assertEquals(functions, functionList);
+    }
+
+    @Test
+    public void testDownloadFunctionHttpUrl() throws Exception {
+        String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
+        String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionsImpl function = new FunctionsImpl(null);
+        StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
+        File pkgFile = new File(testDir, UUID.randomUUID().toString());
+        OutputStream output = new FileOutputStream(pkgFile);
+        streamOutput.write(output);
+        Assert.assertTrue(pkgFile.exists());
+        if (pkgFile.exists()) {
+            pkgFile.delete();
+        }
+    }
+
+    @Test
+    public void testDownloadFunctionFile() throws Exception {
+        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionsImpl function = new FunctionsImpl(null);
+        StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
+        File pkgFile = new File(testDir, UUID.randomUUID().toString());
+        OutputStream output = new FileOutputStream(pkgFile);
+        streamOutput.write(output);
+        Assert.assertTrue(pkgFile.exists());
+        if (pkgFile.exists()) {
+            pkgFile.delete();
+        }
+    }
+
+    @Test
+    public void testRegisterFunctionFileUrlWithValidSinkClass() {
+        Configurator.setRootLevel(Level.DEBUG);
+
+        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String filePackageUrl = "file://" + fileLocation;
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+        RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
+                null, new Gson().toJson(functionConfig), null);
+
+    }
+
+    @Test
+    public void testRegisterFunctionWithConflictingFields() {
+        Configurator.setRootLevel(Level.DEBUG);
+        String actualTenant = "DIFFERENT_TENANT";
+        String actualNamespace = "DIFFERENT_NAMESPACE";
+        String actualName = "DIFFERENT_NAME";
+        this.namespaceList.add(actualTenant + "/" + actualNamespace);
+
+        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String filePackageUrl = "file://" + fileLocation;
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+        when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
+
+        RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
+                null, new Gson().toJson(functionConfig), null);
+    }
+
+    public static FunctionConfig createDefaultFunctionConfig() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        return functionConfig;
+    }
+
+    public static FunctionDetails createDefaultFunctionDetails() {
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
+        return FunctionConfigUtils.convert(functionConfig, null);
+    }
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
similarity index 99%
rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index ee9a014059..bec3ca9d40 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
@@ -87,12 +87,12 @@
 import static org.testng.Assert.assertEquals;
 
 /**
- * Unit test of {@link SinkApiV2Resource}.
+ * Unit test of {@link SinkApiV3Resource}.
  */
 @PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
-public class SinkApiV2ResourceTest {
+public class SinkApiV3ResourceTest {
 
     @ObjectFactory
     public IObjectFactory getObjectFactory() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
similarity index 99%
rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 194f624302..72d23e2658 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
@@ -86,12 +86,12 @@
 import static org.testng.Assert.assertEquals;
 
 /**
- * Unit test of {@link SourceApiV2Resource}.
+ * Unit test of {@link SourceApiV3Resource}.
  */
 @PrepareForTest({Utils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
-public class SourceApiV2ResourceTest {
+public class SourceApiV3ResourceTest {
 
     @ObjectFactory
     public IObjectFactory getObjectFactory() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services