You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/17 18:41:35 UTC

[GitHub] srkukarni closed pull request #2752: Function Serverside validation Part 2

srkukarni closed pull request #2752: Function Serverside validation Part 2
URL: https://github.com/apache/pulsar/pull/2752
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 132eaf3cdf..def4b2be47 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -84,7 +84,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
     }
 
     @PUT
@@ -106,7 +106,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
new file mode 100644
index 0000000000..2fe398973d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class SinkBase extends AdminResource implements Supplier<WorkerService> {
+
+    private final FunctionsImpl functions;
+
+    public SinkBase() {
+        this.functions = new FunctionsImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @POST
+    @ApiOperation(value = "Creates a new Pulsar Sink in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerSink(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace,
+                                 final @PathParam("sinkName") String sinkName,
+                                 final @FormDataParam("data") InputStream uploadedInputStream,
+                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String functionPkgUrl,
+                                 final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+    }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateSink(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("sinkName") String sinkName,
+                               final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                               final @FormDataParam("url") String functionPkgUrl,
+                               final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+
+    }
+
+
+    @DELETE
+    @ApiOperation(value = "Deletes a Pulsar Sink currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "The function was successfully deleted")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    public Response deregisterSink(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("sinkName") String sinkName) {
+        return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about a Pulsar Sink currently running in cluster mode",
+            response = FunctionMetaData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    public Response getSinkInfo(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("sinkName") String sinkName) throws IOException {
+        return functions.getFunctionInfo(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Sink instance",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
+    public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("sinkName") String sinkName,
+                                          final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Sink running in cluster mode",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}/status")
+    public Response getSinkStatus(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("sinkName") String sinkName) throws IOException {
+        return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Lists all Pulsar Sinks currently deployed in a given namespace",
+            response = String.class,
+            responseContainer = "Collection"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}")
+    public Response listSinks(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(tenant, namespace);
+
+    }
+
+    @POST
+    @ApiOperation(value = "Restart sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+        return functions.restartFunctionInstances(tenant, namespace, sinkName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+        return functions.stopFunctionInstances(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO sink connectors currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Path("/builtinsinks")
+    public List<ConnectorDefinition> getSinkList() {
+        List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
new file mode 100644
index 0000000000..1a82ac2880
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class SourceBase extends AdminResource implements Supplier<WorkerService> {
+
+    private final FunctionsImpl functions;
+
+    public SourceBase() {
+        this.functions = new FunctionsImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @POST
+    @ApiOperation(value = "Creates a new Pulsar Source in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerSource(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("sourceName") String sourceName,
+                                   final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String functionPkgUrl,
+                                   final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+    }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Source currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateSource(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace,
+                                 final @PathParam("sourceName") String sourceName,
+                                 final @FormDataParam("data") InputStream uploadedInputStream,
+                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String functionPkgUrl,
+                                 final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+
+    }
+
+
+    @DELETE
+    @ApiOperation(value = "Deletes a Pulsar Source currently running in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "The function was successfully deleted")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    public Response deregisterSource(final @PathParam("tenant") String tenant,
+                                       final @PathParam("namespace") String namespace,
+                                       final @PathParam("sourceName") String sourceName) {
+        return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about a Pulsar Source currently running in cluster mode",
+            response = FunctionMetaData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    public Response getSourceInfo(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("sourceName") String sourceName) throws IOException {
+        return functions.getFunctionInfo(
+            tenant, namespace, sourceName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Source instance",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
+    public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant,
+                                            final @PathParam("namespace") String namespace,
+                                            final @PathParam("sourceName") String sourceName,
+                                            final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Source running in cluster mode",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}/status")
+    public Response getSourceStatus(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("sourceName") String sourceName) throws IOException {
+        return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Lists all Pulsar Sources currently deployed in a given namespace",
+            response = String.class,
+            responseContainer = "Collection"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/{tenant}/{namespace}")
+    public Response listSources(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(
+            tenant, namespace);
+
+    }
+
+    @POST
+    @ApiOperation(value = "Restart source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all source instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+        return functions.restartFunctionInstances(tenant, namespace, sourceName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all source instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+        return functions.stopFunctionInstances(tenant, namespace, sourceName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO source connectors currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Path("/builtinsources")
+    public List<ConnectorDefinition> getSourceList() {
+        List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
new file mode 100644
index 0000000000..aea0ae72f0
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SinkBase;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/sink")
+@Api(value = "/sink", description = "Sink admin apis", tags = "sink", hidden = true)
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Sink extends SinkBase {
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
new file mode 100644
index 0000000000..e5ef56c0f4
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SourceBase;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/source")
+@Api(value = "/source", description = "Source admin apis", tags = "source", hidden = true)
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Source extends SourceBase {
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 0f9219d84e..afa7ef1dd7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -23,10 +23,8 @@
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 
-import java.io.File;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.Map;
 import java.util.Optional;
@@ -44,14 +42,8 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
@@ -60,9 +52,7 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.gson.Gson;
 
 import jersey.repackaged.com.google.common.collect.Lists;
 
@@ -190,15 +180,13 @@ public void testFunctionAssignments() throws Exception {
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
 
-        String jarFilePathUrl = Utils.FILE + ":"
-                + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionDetails.Builder functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion,
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion,
                 functionName, "my.*", sinkTopic, subscriptionName);
-        functionDetailsBuilder.setParallelism(2);
-        FunctionDetails functionDetails = functionDetailsBuilder.build();
+        functionConfig.setParallelism(2);
 
         // (1) Create function with 2 instance
-        admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
         retryStrategically((test) -> {
             try {
                 return admin.topics().getStats(sinkTopic).subscriptions.size() == 1
@@ -213,10 +201,9 @@ public void testFunctionAssignments() throws Exception {
         assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(), 2);
 
         // (2) Update function with 1 instance
-        functionDetailsBuilder.setParallelism(1);
-        functionDetails = functionDetailsBuilder.build();
+        functionConfig.setParallelism(1);
         // try to update function to test: update-function functionality
-        admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl);
+        admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
         retryStrategically((test) -> {
             try {
                 return admin.topics().getStats(sinkTopic).subscriptions.size() == 1
@@ -245,19 +232,17 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
         final FunctionRuntimeManager runtimeManager = functionsWorkerService.getFunctionRuntimeManager();
 
-        String jarFilePathUrl = Utils.FILE + ":"
-                + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionDetails.Builder functionDetailsBuilder = null;
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
+        FunctionConfig functionConfig = null;
         // (1) Register functions with 2 instances
         for (int i = 0; i < totalFunctions; i++) {
             String functionName = baseFunctionName + i;
-            functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion, functionName,
+            functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                     "my.*", sinkTopic, subscriptionName);
-            functionDetailsBuilder.setParallelism(parallelism);
+            functionConfig.setParallelism(parallelism);
             // set-auto-ack prop =true
-            functionDetailsBuilder.setAutoAck(true);
-            FunctionDetails functionDetails = functionDetailsBuilder.build();
-            admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+            functionConfig.setAutoAck(true);
+            admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
         }
         retryStrategically((test) -> {
             try {
@@ -275,13 +260,12 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
         // (2) Update function with prop=auto-ack and Delete 2 functions
         for (int i = 0; i < totalFunctions; i++) {
             String functionName = baseFunctionName + i;
-            functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion, functionName,
+            functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                     "my.*", sinkTopic, subscriptionName);
-            functionDetailsBuilder.setParallelism(parallelism);
+            functionConfig.setParallelism(parallelism);
             // set-auto-ack prop =false
-            functionDetailsBuilder.setAutoAck(false);
-            FunctionDetails functionDetails = functionDetailsBuilder.build();
-            admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl);
+            functionConfig.setAutoAck(false);
+            admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
         }
 
         int totalDeletedFunction = 2;
@@ -328,47 +312,26 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
         }
     }
 
-    protected static FunctionDetails.Builder createFunctionDetails(String jarFile, String tenant, String namespace,
-            String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+    protected static FunctionConfig createFunctionConfig(String tenant, String namespace,
+                                                         String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
 
-        File file = new File(jarFile);
-        try {
-            Reflections.loadJar(file);
-        } catch (MalformedURLException e) {
-            throw new RuntimeException("Failed to load user jar " + file, e);
-        }
         String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
-        Class<?> typeArg = byte[].class;
-
-        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-        functionDetailsBuilder.setTenant(tenant);
-        functionDetailsBuilder.setNamespace(namespace);
-        functionDetailsBuilder.setName(functionName);
-        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-        functionDetailsBuilder.setParallelism(1);
-        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
-
-        // set source spec
-        // source spec classname should be empty so that the default pulsar source will be used
-        SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-        sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
-        sourceSpecBuilder.setTypeClassName(typeArg.getName());
-        sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
-        sourceSpecBuilder.setSubscriptionName(subscriptionName);
-        sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, "");
-        functionDetailsBuilder.setAutoAck(true);
-        functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-        // set up sink spec
-        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-        // sinkSpecBuilder.setClassName(PulsarSink.class.getName());
-        sinkSpecBuilder.setTopic(sinkTopic);
-        Map<String, Object> sinkConfigMap = Maps.newHashMap();
-        sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
-        sinkSpecBuilder.setTypeClassName(typeArg.getName());
-        functionDetailsBuilder.setSink(sinkSpecBuilder);
-
-        return functionDetailsBuilder;
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setParallelism(1);
+        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        functionConfig.setTopicsPattern(sourceTopicPattern);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setAutoAck(true);
+        functionConfig.setOutput(sinkTopic);
+
+        return functionConfig;
     }
 
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
similarity index 74%
rename from pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 36db99fd3d..babcdd6df4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -26,16 +26,10 @@
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
-import java.io.File;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
-import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.test.PortManager;
@@ -47,29 +41,17 @@
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
-import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -82,9 +64,7 @@
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.gson.Gson;
 
 import jersey.repackaged.com.google.common.collect.Lists;
 
@@ -92,7 +72,7 @@
  * Test Pulsar sink on function
  *
  */
-public class PulsarSinkE2ETest {
+public class PulsarFunctionE2ETest {
     LocalBookkeeperEnsemble bkEnsemble;
 
     ServiceConfiguration config;
@@ -120,7 +100,7 @@
     private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
     private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
 
-    private static final Logger log = LoggerFactory.getLogger(PulsarSinkE2ETest.class);
+    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
 
     @DataProvider(name = "validRoleName")
     public Object[][] validRoleName() {
@@ -244,13 +224,31 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
         return new WorkerService(workerConfig);
     }
 
+    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+        String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        functionConfig.setParallelism(1);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setTopicsPattern(sourceTopicPattern);
+        functionConfig.setAutoAck(true);
+        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setOutput(sinkTopic);
+        return functionConfig;
+    }
+
     /**
      * Validates pulsar sink e2e functionality on functions.
      *
      * @throws Exception
      */
     @Test(timeOut = 20000)
-    public void testE2EPulsarSink() throws Exception {
+    public void testE2EPulsarFunction() throws Exception {
 
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
@@ -265,17 +263,16 @@ public void testE2EPulsarSink() throws Exception {
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
 
         // create a producer that creates a topic at broker
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(sinkTopic).subscriptionName("sub").subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
 
-        String jarFilePathUrl = Utils.FILE + ":"
-                + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                 "my.*", sinkTopic, subscriptionName);
-        admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
 
         // try to update function to test: update-function functionality
-        admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl);
+        admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
             try {
@@ -290,7 +287,7 @@ public void testE2EPulsarSink() throws Exception {
         int totalMsgs = 5;
         for (int i = 0; i < totalMsgs; i++) {
             String data = "my-message-" + i;
-            producer.newMessage().property(propertyKey, propertyValue).value(data.getBytes()).send();
+            producer.newMessage().property(propertyKey, propertyValue).value(data).send();
         }
         retryStrategically((test) -> {
             try {
@@ -301,7 +298,7 @@ public void testE2EPulsarSink() throws Exception {
             }
         }, 5, 150);
 
-        Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+        Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
         String receivedPropertyValue = msg.getProperty(propertyKey);
         assertEquals(propertyValue, receivedPropertyValue);
 
@@ -313,7 +310,7 @@ public void testE2EPulsarSink() throws Exception {
     }
 
     @Test(timeOut = 20000)
-    public void testPulsarSinkStats() throws Exception {
+    public void testPulsarFunctionStats() throws Exception {
 
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
@@ -328,16 +325,15 @@ public void testPulsarSinkStats() throws Exception {
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
 
         // create a producer that creates a topic at broker
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
 
-        String jarFilePathUrl = Utils.FILE + ":"
-                + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                 "my.*", sinkTopic, subscriptionName);
-        admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
 
         // try to update function to test: update-function functionality
-        admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl);
+        admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
             try {
@@ -352,7 +348,7 @@ public void testPulsarSinkStats() throws Exception {
         int totalMsgs = 10;
         for (int i = 0; i < totalMsgs; i++) {
             String data = "my-message-" + i;
-            producer.newMessage().property(propertyKey, propertyValue).value(data.getBytes()).send();
+            producer.newMessage().property(propertyKey, propertyValue).value(data).send();
         }
         retryStrategically((test) -> {
             try {
@@ -382,49 +378,6 @@ public void testPulsarSinkStats() throws Exception {
         assertEquals(ownerWorkerId, workerId);
     }
 
-    protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
-
-        File file = new File(jarFile);
-        try {
-            Reflections.loadJar(file);
-        } catch (MalformedURLException e) {
-            throw new RuntimeException("Failed to load user jar " + file, e);
-        }
-        String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
-        Class<?> typeArg = byte[].class;
-
-        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-        functionDetailsBuilder.setTenant(tenant);
-        functionDetailsBuilder.setNamespace(namespace);
-        functionDetailsBuilder.setName(functionName);
-        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-        functionDetailsBuilder.setParallelism(1);
-        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
-        functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE);
-
-        // set source spec
-        // source spec classname should be empty so that the default pulsar source will be used
-        SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-        sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
-        sourceSpecBuilder.setTypeClassName(typeArg.getName());
-        sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
-        sourceSpecBuilder.setSubscriptionName(subscriptionName);
-        sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, "");
-        functionDetailsBuilder.setAutoAck(true);
-        functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-        // set up sink spec
-        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-        // sinkSpecBuilder.setClassName(PulsarSink.class.getName());
-        sinkSpecBuilder.setTopic(sinkTopic);
-        Map<String, Object> sinkConfigMap = Maps.newHashMap();
-        sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
-        sinkSpecBuilder.setTypeClassName(typeArg.getName());
-        functionDetailsBuilder.setSink(sinkSpecBuilder);
-
-        return functionDetailsBuilder.build();
-    }
-
     @Test(dataProvider = "validRoleName")
     public void testAuthorization(boolean validRoleName) throws Exception {
 
@@ -443,12 +396,11 @@ public void testAuthorization(boolean validRoleName) throws Exception {
         propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
 
-        String jarFilePathUrl = Utils.FILE + ":"
-                + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                 "my.*", sinkTopic, subscriptionName);
         try {
-            admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+            admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
             assertTrue(validRoleName);
         } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
             assertFalse(validRoleName);
@@ -466,45 +418,22 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sinkTopic = "persistent://" + replNamespace + "/output";
         final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
 
-        String jarFilePathUrl = Utils.FILE + ":"
-                + IdentityFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-
-        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-        functionDetailsBuilder.setTenant(tenant);
-        functionDetailsBuilder.setNamespace(namespacePortion);
-        functionDetailsBuilder.setName(functionName);
-        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-        functionDetailsBuilder.setParallelism(1);
-        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
 
-        Class<?>[] typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes(new IdentityFunction(), false);
-
-        // set source spec
-        // source spec classname should be empty so that the default pulsar source will be used
-        SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-        sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
-        sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, TopicSchema.DEFAULT_SERDE);
-        functionDetailsBuilder.setAutoAck(true);
-        functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-        // set up sink spec
-        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-        sinkSpecBuilder.setTopic(sinkTopic);
-        Map<String, Object> sinkConfigMap = Maps.newHashMap();
-        sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
-        functionDetailsBuilder.setSink(sinkSpecBuilder);
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+                "my.*", sinkTopic, subscriptionName);
 
-        FunctionDetails functionDetails = functionDetailsBuilder.build();
-        admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
 
-        FunctionDetails functionMetadata = admin.functions().getFunction(tenant, namespacePortion, functionName);
+        FunctionDetails functionMetadata = admin.source().getSource(tenant, namespacePortion, functionName);
 
-        assertEquals(functionMetadata.getSource().getTypeClassName(), typeArgs[0].getName());
-        assertEquals(functionMetadata.getSink().getTypeClassName(), typeArgs[1].getName());
+        assertEquals(functionMetadata.getSource().getTypeClassName(), String.class.getName());
+        assertEquals(functionMetadata.getSink().getTypeClassName(), String.class.getName());
 
     }
 
@@ -523,13 +452,12 @@ public void testFunctionStopAndRestartApi() throws Exception {
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
 
         // create source topic
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
 
-        String jarFilePathUrl = Utils.FILE + ":"
-                + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                 sourceTopicName, sinkTopic, subscriptionName);
-        admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
             try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 2b32fd0c60..6d7dbd2f94 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -25,13 +25,17 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.io.File;
 import java.lang.reflect.Method;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -42,8 +46,12 @@
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -205,11 +213,11 @@ public void testAuthorization() throws Exception {
 
         String jarFilePathUrl = String.format("%s:%s", Utils.FILE,
                 PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
-        FunctionDetails functionDetails = PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl, tenant, namespacePortion,
+        FunctionConfig functionConfig = createFunctionConfig(jarFilePathUrl, tenant, namespacePortion,
                 functionName, "my.*", sinkTopic, subscriptionName);
 
         try {
-            functionAdmin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+            functionAdmin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
             fail("Authentication should pass but call should fail with function already exist");
         } catch (PulsarAdminException e) {
             assertTrue(e.getMessage().contains("already exists"));
@@ -217,4 +225,31 @@ public void testAuthorization() throws Exception {
 
     }
 
+    protected static FunctionConfig createFunctionConfig(String jarFile, String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+
+        File file = new File(jarFile);
+        try {
+            Reflections.loadJar(file);
+        } catch (MalformedURLException e) {
+            throw new RuntimeException("Failed to load user jar " + file, e);
+        }
+        String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
+        Class<?> typeArg = byte[].class;
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setParallelism(1);
+        functionConfig.setClassName(IdentityFunction.class.getName());
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setTopicsPattern(sourceTopicPattern);
+        functionConfig.setAutoAck(true);
+        functionConfig.setOutput(sinkTopic);
+
+        return functionConfig;
+    }
+
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/test/resources/pulsar-examples.jar b/pulsar-broker/src/test/resources/pulsar-examples.jar
new file mode 100644
index 0000000000..f07f094d7f
Binary files /dev/null and b/pulsar-broker/src/test/resources/pulsar-examples.jar differ
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index 79625bf740..e39f36213f 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -46,6 +46,12 @@
       <version>${project.version}</version>
     </dependency>
 
+      <dependency>
+          <groupId>${project.groupId}</groupId>
+          <artifactId>pulsar-functions-utils</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+
     <dependency>
       <groupId>org.glassfish.jersey.core</groupId>
       <artifactId>jersey-client</artifactId>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index bdb2c568bb..10c890c810 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -28,8 +28,7 @@
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.worker.WorkerInfo;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 
 /**
  * Admin interface for function management.
@@ -83,13 +82,13 @@
     /**
      * Create a new function.
      *
-     * @param functionDetails
+     * @param functionConfig
      *            the function configuration object
      *
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void createFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException;
+    void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;
 
     /**
      * <pre>
@@ -99,19 +98,19 @@
      * Http: http://www.repo.com/fileName.jar
      * </pre>
      *
-     * @param functionDetails
+     * @param functionConfig
      *            the function configuration object
      * @param pkgUrl
      *            url from which pkg can be downloaded
      * @throws PulsarAdminException
      */
-    void createFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException;
+    void createFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException;
 
     /**
      * Update the configuration for a function.
      * <p>
      *
-     * @param functionDetails
+     * @param functionConfig
      *            the function configuration object
      *
      * @throws NotAuthorizedException
@@ -121,7 +120,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void updateFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException;
+    void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;
 
     /**
      * Update the configuration for a function.
@@ -132,7 +131,7 @@
      * Http: http://www.repo.com/fileName.jar
      * </pre>
      *
-     * @param functionDetails
+     * @param functionConfig
      *            the function configuration object
      * @param pkgUrl
      *            url from which pkg can be downloaded
@@ -143,7 +142,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void updateFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException;
+    void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException;
 
     /**
      * Delete an existing function
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index a1502b0b05..0953d22cc2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -32,21 +32,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.pulsar.client.admin.internal.BookiesImpl;
-import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
-import org.apache.pulsar.client.admin.internal.BrokersImpl;
-import org.apache.pulsar.client.admin.internal.ClustersImpl;
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
-import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
-import org.apache.pulsar.client.admin.internal.LookupImpl;
-import org.apache.pulsar.client.admin.internal.NamespacesImpl;
-import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
-import org.apache.pulsar.client.admin.internal.SchemasImpl;
-import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.admin.internal.WorkerImpl;
-import org.apache.pulsar.client.admin.internal.TenantsImpl;
-import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
-import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
+import org.apache.pulsar.client.admin.internal.*;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -85,6 +71,8 @@
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
+    private final Source source;
+    private final Sink sink;
     private final Worker worker;
     private final Schemas schemas;
     protected final WebTarget root;
@@ -189,6 +177,8 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData)
         this.resourceQuotas = new ResourceQuotasImpl(root, auth);
         this.lookups = new LookupImpl(root, auth, useTls);
         this.functions = new FunctionsImpl(root, auth);
+        this.source = new SourceImpl(root, auth);
+        this.sink = new SinkImpl(root, auth);
         this.worker = new WorkerImpl(root, auth);
         this.schemas = new SchemasImpl(root, auth);
         this.bookies = new BookiesImpl(root, auth);
@@ -357,6 +347,22 @@ public Functions functions() {
         return functions;
     }
 
+    /**
+     *
+     * @return the source management object
+     */
+    public Source source() {
+        return source;
+    }
+
+    /**
+     *
+     * @return the sink management object
+     */
+    public Sink sink() {
+        return sink;
+    }
+
     /**
     *
     * @return the Worker stats
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
new file mode 100644
index 0000000000..3f8fe2f478
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SinkConfig;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Admin interface for Sink management.
+ */
+public interface Sink {
+    /**
+     * Get the list of sinks.
+     * <p>
+     * Get the list of all the Pulsar Sinks.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["f1", "f2", "f3"]</code>
+     * </pre>
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<String> getSinks(String tenant, String namespace) throws PulsarAdminException;
+
+    /**
+     * Get the configuration for the specified sink.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @return the sink configuration
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Function.FunctionDetails getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+    /**
+     * Create a new sink.
+     *
+     * @param sinkConfig
+     *            the sink configuration object
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException;
+
+    /**
+     * <pre>
+     * Create a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * </pre>
+     *
+     * @param sinkConfig
+     *            the sink configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @throws PulsarAdminException
+     */
+    void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException;
+
+    /**
+     * Update the configuration for a sink.
+     * <p>
+     *
+     * @param sinkConfig
+     *            the sink configuration object
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException;
+
+    /**
+     * Update the configuration for a sink.
+     * <pre>
+     * Update a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * </pre>
+     *
+     * @param sinkConfig
+     *            the sink configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException;
+
+    /**
+     * Delete an existing sink
+     * <p>
+     * Delete a sink
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Cluster does not exist
+     * @throws PreconditionFailedException
+     *             Cluster is not empty
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+    /**
+     * Gets the current status of a sink.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    FunctionStatusList getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+    /**
+     * Gets the current status of a sink instance.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     * @param id
+     *            Sink instance-id
+     * @return
+     * @throws PulsarAdminException
+     */
+    FunctionStatus getSinkStatus(String tenant, String namespace, String sink, int id)
+            throws PulsarAdminException;
+
+    /**
+     * Restart sink instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @param instanceId
+     *            Sink instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException;
+
+    /**
+     * Restart all sink instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+
+    /**
+     * Stop sink instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @param instanceId
+     *            Sink instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException;
+
+    /**
+     * Stop all sink instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+    /**
+     * Fetches a list of supported Pulsar IO sinks currently running in cluster mode
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     *
+     */
+    List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException;
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
new file mode 100644
index 0000000000..3c43cf203f
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SourceConfig;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Admin interface for Source management.
+ */
+public interface Source {
+    /**
+     * Get the list of sources.
+     * <p>
+     * Get the list of all the Pulsar Sources.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["f1", "f2", "f3"]</code>
+     * </pre>
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<String> getSources(String tenant, String namespace) throws PulsarAdminException;
+
+    /**
+     * Get the configuration for the specified source.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @return the source configuration
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Function.FunctionDetails getSource(String tenant, String namespace, String source) throws PulsarAdminException;
+
+    /**
+     * Create a new source.
+     *
+     * @param sourceConfig
+     *            the source configuration object
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException;
+
+    /**
+     * <pre>
+     * Create a new source by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * </pre>
+     *
+     * @param sourceConfig
+     *            the source configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @throws PulsarAdminException
+     */
+    void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException;
+
+    /**
+     * Update the configuration for a source.
+     * <p>
+     *
+     * @param sourceConfig
+     *            the source configuration object
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException;
+
+    /**
+     * Update the configuration for a source.
+     * <pre>
+     * Update a source by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * </pre>
+     *
+     * @param sourceConfig
+     *            the source configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException;
+
+    /**
+     * Delete an existing source
+     * <p>
+     * Delete a source
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Cluster does not exist
+     * @throws PreconditionFailedException
+     *             Cluster is not empty
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException;
+
+    /**
+     * Gets the current status of a source.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    FunctionStatusList getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException;
+
+    /**
+     * Gets the current status of a source instance.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     * @param id
+     *            Source instance-id
+     * @return
+     * @throws PulsarAdminException
+     */
+    FunctionStatus getSourceStatus(String tenant, String namespace, String source, int id)
+            throws PulsarAdminException;
+
+    /**
+     * Restart source instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @param instanceId
+     *            Source instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException;
+
+    /**
+     * Restart all source instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartSource(String tenant, String namespace, String source) throws PulsarAdminException;
+
+
+    /**
+     * Stop source instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @param instanceId
+     *            Source instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException;
+
+    /**
+     * Stop all source instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopSource(String tenant, String namespace, String source) throws PulsarAdminException;
+
+    /**
+     * Fetches a list of supported Pulsar IO sources currently running in cluster mode
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     *
+     */
+    List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException;
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 99d0da2422..b9ea1a5244 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import com.google.gson.Gson;
 import com.google.protobuf.AbstractMessage.Builder;
 import com.google.protobuf.MessageOrBuilder;
 import com.google.protobuf.util.JsonFormat;
@@ -48,6 +49,7 @@
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.worker.WorkerInfo;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
@@ -129,7 +131,7 @@ public FunctionStatus getFunctionStatus(
     }
 
     @Override
-    public void createFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException {
+    public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
 
@@ -138,10 +140,10 @@ public void createFunction(FunctionDetails functionDetails, String fileName) thr
                 mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
             }
 
-            mp.bodyPart(new FormDataBodyPart("functionDetails",
-                printJson(functionDetails),
+            mp.bodyPart(new FormDataBodyPart("functionConfig",
+                new Gson().toJson(functionConfig),
                 MediaType.APPLICATION_JSON_TYPE));
-            request(functions.path(functionDetails.getTenant()).path(functionDetails.getNamespace()).path(functionDetails.getName()))
+            request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
                     .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
@@ -149,16 +151,16 @@ public void createFunction(FunctionDetails functionDetails, String fileName) thr
     }
 
     @Override
-    public void createFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException {
+    public void createFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
 
             mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
 
-            mp.bodyPart(new FormDataBodyPart("functionDetails",
-                printJson(functionDetails),
+            mp.bodyPart(new FormDataBodyPart("functionConfig",
+                new Gson().toJson(functionConfig),
                 MediaType.APPLICATION_JSON_TYPE));
-            request(functions.path(functionDetails.getTenant()).path(functionDetails.getNamespace()).path(functionDetails.getName()))
+            request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
                     .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
@@ -176,7 +178,7 @@ public void deleteFunction(String cluster, String namespace, String function) th
     }
 
     @Override
-    public void updateFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException {
+    public void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
 
@@ -185,10 +187,10 @@ public void updateFunction(FunctionDetails functionDetails, String fileName) thr
                 mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
             }
 
-            mp.bodyPart(new FormDataBodyPart("functionDetails",
-                printJson(functionDetails),
+            mp.bodyPart(new FormDataBodyPart("functionConfig",
+                new Gson().toJson(functionConfig),
                 MediaType.APPLICATION_JSON_TYPE));
-            request(functions.path(functionDetails.getTenant()).path(functionDetails.getNamespace()).path(functionDetails.getName()))
+            request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
                     .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
@@ -196,16 +198,16 @@ public void updateFunction(FunctionDetails functionDetails, String fileName) thr
     }
 
     @Override
-    public void updateFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException {
+    public void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
 
             mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
 
-            mp.bodyPart(new FormDataBodyPart("functionDetails", printJson(functionDetails),
+            mp.bodyPart(new FormDataBodyPart("functionConfig", new Gson().toJson(functionConfig),
                     MediaType.APPLICATION_JSON_TYPE));
-            request(functions.path(functionDetails.getTenant()).path(functionDetails.getNamespace())
-                    .path(functionDetails.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
+            request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace())
+                    .path(functionConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
                             ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
new file mode 100644
index 0000000000..4e13693cc0
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import com.google.gson.Gson;
+import com.google.protobuf.AbstractMessage.Builder;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Sink;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SinkConfig;
+import org.glassfish.jersey.media.multipart.FormDataBodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SinkImpl extends BaseResource implements Sink {
+
+    private final WebTarget sink;
+
+    public SinkImpl(WebTarget web, Authentication auth) {
+        super(auth);
+        this.sink = web.path("/admin/v2/sink");
+    }
+
+    @Override
+    public List<String> getSinks(String tenant, String namespace) throws PulsarAdminException {
+        try {
+            Response response = request(sink.path(tenant).path(namespace)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new GenericType<List<String>>() {
+            });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionDetails getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
+        try {
+             Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+            mergeJson(jsonResponse, functionDetailsBuilder);
+            return functionDetailsBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStatusList getSinkStatus(
+            String tenant, String namespace, String sinkName) throws PulsarAdminException {
+        try {
+            Response response = request(sink.path(tenant).path(namespace).path(sinkName).path("status")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder();
+            mergeJson(jsonResponse, functionStatusBuilder);
+            return functionStatusBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStatus getSinkStatus(
+            String tenant, String namespace, String sinkName, int id) throws PulsarAdminException {
+        try {
+            Response response = request(
+                    sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status"))
+                            .get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
+            mergeJson(jsonResponse, functionStatusBuilder);
+            return functionStatusBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+
+            mp.bodyPart(new FormDataBodyPart("sinkConfig",
+                new Gson().toJson(sinkConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+
+            mp.bodyPart(new FormDataBodyPart("sinkConfig",
+                    new Gson().toJson(sinkConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void deleteSink(String cluster, String namespace, String function) throws PulsarAdminException {
+        try {
+            request(sink.path(cluster).path(namespace).path(function))
+                    .delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+
+            mp.bodyPart(new FormDataBodyPart("sinkConfig",
+                    new Gson().toJson(sinkConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+                    .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+
+            mp.bodyPart(new FormDataBodyPart("sinkConfig", new Gson().toJson(sinkConfig),
+                    MediaType.APPLICATION_JSON_TYPE));
+            request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace())
+                    .path(sinkConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
+                            ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartSink(String tenant, String namespace, String functionName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            request(sink.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+                    .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartSink(String tenant, String namespace, String functionName) throws PulsarAdminException {
+        try {
+            request(sink.path(tenant).path(namespace).path(functionName).path("restart"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopSink(String tenant, String namespace, String sinkName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            request(sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId))
+                    .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
+        try {
+            request(sink.path(tenant).path(namespace).path(sinkName).path("stop"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException {
+        try {
+            Response response = request(sink.path("builtinsinks")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new GenericType<List<ConnectorDefinition>>() {});
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+
+    public static void mergeJson(String json, Builder builder) throws IOException {
+        JsonFormat.parser().merge(json, builder);
+    }
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
new file mode 100644
index 0000000000..65a2bfc55a
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import com.google.gson.Gson;
+import com.google.protobuf.AbstractMessage.Builder;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Source;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SourceConfig;
+import org.glassfish.jersey.media.multipart.FormDataBodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SourceImpl extends BaseResource implements Source {
+
+    private final WebTarget source;
+
+    public SourceImpl(WebTarget web, Authentication auth) {
+        super(auth);
+        this.source = web.path("/admin/v2/source");
+    }
+
+    @Override
+    public List<String> getSources(String tenant, String namespace) throws PulsarAdminException {
+        try {
+            Response response = request(source.path(tenant).path(namespace)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new GenericType<List<String>>() {
+            });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionDetails getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
+        try {
+             Response response = request(source.path(tenant).path(namespace).path(sourceName)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+            mergeJson(jsonResponse, functionDetailsBuilder);
+            return functionDetailsBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStatusList getSourceStatus(
+            String tenant, String namespace, String sourceName) throws PulsarAdminException {
+        try {
+            Response response = request(source.path(tenant).path(namespace).path(sourceName).path("status")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder();
+            mergeJson(jsonResponse, functionStatusBuilder);
+            return functionStatusBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStatus getSourceStatus(
+            String tenant, String namespace, String sourceName, int id) throws PulsarAdminException {
+        try {
+            Response response = request(
+                    source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(id)).path("status"))
+                            .get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
+            mergeJson(jsonResponse, functionStatusBuilder);
+            return functionStatusBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+
+            mp.bodyPart(new FormDataBodyPart("sourceConfig",
+                new Gson().toJson(sourceConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+
+            mp.bodyPart(new FormDataBodyPart("sourceConfig",
+                    new Gson().toJson(sourceConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void deleteSource(String cluster, String namespace, String function) throws PulsarAdminException {
+        try {
+            request(source.path(cluster).path(namespace).path(function))
+                    .delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+
+            mp.bodyPart(new FormDataBodyPart("sourceConfig",
+                    new Gson().toJson(sourceConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+                    .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+
+            mp.bodyPart(new FormDataBodyPart("sourceConfig", new Gson().toJson(sourceConfig),
+                    MediaType.APPLICATION_JSON_TYPE));
+            request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace())
+                    .path(sourceConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
+                            ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartSource(String tenant, String namespace, String functionName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            request(source.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+                    .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartSource(String tenant, String namespace, String functionName) throws PulsarAdminException {
+        try {
+            request(source.path(tenant).path(namespace).path(functionName).path("restart"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopSource(String tenant, String namespace, String sourceName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            request(source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(instanceId))
+                    .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
+        try {
+            request(source.path(tenant).path(namespace).path(sourceName).path("stop"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException {
+        try {
+            Response response = request(source.path("builtinsources")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new GenericType<List<ConnectorDefinition>>() {});
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+
+    public static void mergeJson(String json, Builder builder) throws IOException {
+        JsonFormat.parser().merge(json, builder);
+    }
+
+}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index de5a9e0d77..0c512b82a5 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -44,6 +44,7 @@
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.powermock.api.mockito.PowerMockito;
@@ -213,7 +214,7 @@ public void testCreateFunction() throws Exception {
         assertEquals(outputTopicName, creater.getOutput());
         assertEquals(false, creater.isAutoAck());
 
-        verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
 
     }
 
@@ -343,7 +344,7 @@ public void testCreateFunctionWithFileUrl() throws Exception {
         assertEquals(fnName, creater.getFunctionName());
         assertEquals(inputTopicName, creater.getInputs());
         assertEquals(outputTopicName, creater.getOutput());
-        verify(functions, times(1)).createFunctionWithUrl(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
     }
 
     @Test
@@ -420,7 +421,7 @@ public void testCreateFunctionWithTopicPatterns() throws Exception {
         assertEquals(topicPatterns, creater.getTopicsPattern());
         assertEquals(outputTopicName, creater.getOutput());
 
-        verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
 
     }
 
@@ -441,7 +442,7 @@ public void testCreateWithoutTenant() throws Exception {
 
         CreateFunction creater = cmd.getCreater();
         assertEquals("public", creater.getFunctionConfig().getTenant());
-        verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
     }
 
     @Test
@@ -461,7 +462,7 @@ public void testCreateWithoutNamespace() throws Exception {
         CreateFunction creater = cmd.getCreater();
         assertEquals("public", creater.getFunctionConfig().getTenant());
         assertEquals("default", creater.getFunctionConfig().getNamespace());
-        verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
     }
 
     @Test
@@ -486,7 +487,7 @@ public void testCreateUsingFullyQualifiedFunctionName() throws Exception {
         assertEquals(tenant, creater.getFunctionConfig().getTenant());
         assertEquals(namespace, creater.getFunctionConfig().getNamespace());
         assertEquals(functionName, creater.getFunctionConfig().getName());
-        verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
     }
 
     @Test
@@ -505,7 +506,7 @@ public void testCreateWithoutFunctionName() throws Exception {
 
         CreateFunction creater = cmd.getCreater();
         assertEquals("CmdFunctionsTest$DummyFunction", creater.getFunctionConfig().getName());
-        verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
     }
 
     @Test
@@ -522,7 +523,7 @@ public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
 
         CreateFunction creater = cmd.getCreater();
         assertNull(creater.getFunctionConfig().getOutput());
-        verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
 
     }
 
@@ -613,7 +614,7 @@ public void testUpdateFunction() throws Exception {
         assertEquals(inputTopicName, updater.getInputs());
         assertEquals(outputTopicName, updater.getOutput());
 
-        verify(functions, times(1)).updateFunction(any(FunctionDetails.class), anyString());
+        verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString());
     }
 
     @Test
@@ -711,9 +712,9 @@ private void testValidateFunctionsConfigs(String[] correctArgs, String[] incorre
             }
 
             if (type.equals("create")) {
-                verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+                verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
             } else if (type.equals("update")) {
-                verify(functions, times(1)).updateFunction(any(FunctionDetails.class), anyString());
+                verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString());
             }
 
             setup();
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c771e7b5d4..2f23a63039 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -684,9 +684,9 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) {
-                admin.functions().createFunctionWithUrl(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getJar());
+                admin.functions().createFunctionWithUrl(functionConfig, functionConfig.getJar());
             } else {
-                admin.functions().createFunction(FunctionConfigUtils.convert(functionConfig, classLoader), userCodeFile);
+                admin.functions().createFunction(functionConfig, userCodeFile);
             }
 
             print("Created successfully");
@@ -776,9 +776,9 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) {
-                admin.functions().updateFunctionWithUrl(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getJar());
+                admin.functions().updateFunctionWithUrl(functionConfig, functionConfig.getJar());
             } else {
-                admin.functions().updateFunction(FunctionConfigUtils.convert(functionConfig, classLoader), userCodeFile);
+                admin.functions().updateFunction(functionConfig, userCodeFile);
             }
             print("Updated successfully");
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index bd5af791a8..e56a34384e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -40,6 +40,7 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -79,7 +80,7 @@ public CmdSinks(PulsarAdmin admin) {
         jcommander.addCommand("update", updateSink);
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("localrun", localSinkRunner);
-        jcommander.addCommand("available-sinks", new ListSinks());
+        jcommander.addCommand("available-sinks", new ListBuiltInSinks());
     }
 
     /**
@@ -187,9 +188,9 @@ protected String validateSinkType(String sinkType) throws IOException {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
-                admin.functions().createFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
+                admin.sink().createSinkWithUrl(sinkConfig, sinkConfig.getArchive());
             } else {
-                admin.functions().createFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
+                admin.sink().createSink(sinkConfig, sinkConfig.getArchive());
             }
             print("Created successfully");
         }
@@ -200,9 +201,9 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
-                admin.functions().updateFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
+                admin.sink().updateSinkWithUrl(sinkConfig, sinkConfig.getArchive());
             } else {
-                admin.functions().updateFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
+                admin.sink().updateSink(sinkConfig, sinkConfig.getArchive());
             }
             print("Updated successfully");
         }
@@ -282,6 +283,8 @@ void runCmd() throws Exception {
 
         protected SinkConfig sinkConfig;
 
+        protected NarClassLoader classLoader;
+
         private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = DEPRECATED_subsName;
             if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern = DEPRECATED_topicsPattern;
@@ -449,7 +452,6 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
             }
 
             // if jar file is present locally then load jar and validate SinkClass in it
-            ClassLoader classLoader = null;
             if (archivePath != null) {
                 if (!fileExists(archivePath)) {
                     throw new ParameterException("Archive file " + archivePath + " does not exist");
@@ -482,14 +484,14 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
                 throws IOException {
             org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
                     = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig)), functionDetailsBuilder);
+            Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig, classLoader)), functionDetailsBuilder);
             return functionDetailsBuilder.build();
         }
 
         protected String validateSinkType(String sinkType) throws IOException {
             Set<String> availableSinks;
             try {
-                availableSinks = admin.functions().getSinks();
+                availableSinks = admin.sink().getBuiltInSinks().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
             } catch (PulsarAdminException e) {
                 throw new IOException(e);
             }
@@ -533,16 +535,16 @@ void processArguments() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            admin.functions().deleteFunction(tenant, namespace, name);
+            admin.sink().deleteSink(tenant, namespace, name);
             print("Deleted successfully");
         }
     }
 
     @Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster")
-    public class ListSinks extends BaseCommand {
+    public class ListBuiltInSinks extends BaseCommand {
         @Override
         void runCmd() throws Exception {
-            admin.functions().getConnectorsList().stream().filter(x -> isNotBlank(x.getSinkClass()))
+            admin.sink().getBuiltInSinks().stream().filter(x -> isNotBlank(x.getSinkClass()))
                     .forEach(connector -> {
                         System.out.println(connector.getName());
                         System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 5d0c84c90d..f2d768199e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -37,6 +37,7 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -79,7 +80,7 @@ public CmdSources(PulsarAdmin admin) {
         jcommander.addCommand("update", updateSource);
         jcommander.addCommand("delete", deleteSource);
         jcommander.addCommand("localrun", localSourceRunner);
-        jcommander.addCommand("available-sources", new ListSources());
+        jcommander.addCommand("available-sources", new ListBuiltInSources());
     }
 
     /**
@@ -187,9 +188,9 @@ protected String validateSourceType(String sourceType) throws IOException {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) {
-                admin.functions().createFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
+                admin.source().createSourceWithUrl(sourceConfig, sourceConfig.getArchive());
             } else {
-                admin.functions().createFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
+                admin.source().createSource(sourceConfig, sourceConfig.getArchive());
             }
             print("Created successfully");
         }
@@ -200,9 +201,9 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
-                admin.functions().updateFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
+                admin.source().updateSourceWithUrl(sourceConfig, sourceConfig.getArchive());
             } else {
-                admin.functions().updateFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
+                admin.source().updateSource(sourceConfig, sourceConfig.getArchive());
             }
             print("Updated successfully");
         }
@@ -267,6 +268,8 @@ void runCmd() throws Exception {
 
         protected SourceConfig sourceConfig;
 
+        protected NarClassLoader classLoader;
+
         private void mergeArgs() {
             if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees;
             if (!StringUtils.isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
@@ -403,7 +406,6 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) {
 
 
             // if jar file is present locally then load jar and validate SinkClass in it
-            ClassLoader classLoader = null;
             if (archivePath != null) {
                 if (!fileExists(archivePath)) {
                     throw new ParameterException("Archive file " + archivePath + " does not exist");
@@ -436,14 +438,14 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) {
                 throws IOException {
             org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
                     = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig)), functionDetailsBuilder);
+            Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig, classLoader)), functionDetailsBuilder);
             return functionDetailsBuilder.build();
         }
 
         protected String validateSourceType(String sourceType) throws IOException {
             Set<String> availableSources;
             try {
-                availableSources = admin.functions().getSources();
+                availableSources = admin.source().getBuiltInSources().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
             } catch (PulsarAdminException e) {
                 throw new IOException(e);
             }
@@ -487,16 +489,16 @@ void processArguments() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            admin.functions().deleteFunction(tenant, namespace, name);
+            admin.source().deleteSource(tenant, namespace, name);
             print("Delete source successfully");
         }
     }
 
     @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster")
-    public class ListSources extends BaseCommand {
+    public class ListBuiltInSources extends BaseCommand {
         @Override
         void runCmd() throws Exception {
-            admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass()))
+            admin.source().getBuiltInSources().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass()))
                     .forEach(connector -> {
                         System.out.println(connector.getName());
                         System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index fc0f180bc1..0c561453fd 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -26,7 +26,6 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
 
 import com.beust.jcommander.ParameterException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
@@ -39,7 +38,7 @@
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
-import org.apache.pulsar.client.admin.Functions;
+import org.apache.pulsar.client.admin.Sink;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -97,7 +96,7 @@ public IObjectFactory getObjectFactory() {
     private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 0000 2018\"}";
 
     private PulsarAdmin pulsarAdmin;
-    private Functions functions;
+    private Sink sink;
     private CmdSinks cmdSinks;
     private CmdSinks.CreateSink createSink;
     private CmdSinks.UpdateSink updateSink;
@@ -108,8 +107,8 @@ public IObjectFactory getObjectFactory() {
     public void setup() throws Exception {
 
         pulsarAdmin = mock(PulsarAdmin.class);
-        functions = mock(Functions.class);
-        when(pulsarAdmin.functions()).thenReturn(functions);
+        sink = mock(Sink.class);
+        when(pulsarAdmin.sink()).thenReturn(sink);
 
         cmdSinks = spy(new CmdSinks(pulsarAdmin));
         createSink = spy(cmdSinks.getCreateSink());
@@ -435,7 +434,7 @@ public void testMissingArchive() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "The 'twitter' connector does not provide a sink implementation")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
     public void testInvalidJarWithNoSource() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
         sinkConfig.setArchive(WRONG_JAR_PATH);
@@ -783,7 +782,7 @@ public void testCmdSinkConfigFileInvalidJar() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "The 'twitter' connector does not provide a sink implementation")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
     public void testCmdSinkConfigFileInvalidJarNoSink() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
         testSinkConfig.setArchive(WRONG_JAR_PATH);
@@ -969,7 +968,7 @@ public void testDeleteMissingTenant() throws Exception {
 
         deleteSink.runCmd();
 
-        verify(functions).deleteFunction(eq(PUBLIC_TENANT), eq(NAMESPACE), eq(NAME));
+        verify(sink).deleteSink(eq(PUBLIC_TENANT), eq(NAMESPACE), eq(NAME));
     }
 
     @Test
@@ -982,7 +981,7 @@ public void testDeleteMissingNamespace() throws Exception {
 
         deleteSink.runCmd();
 
-        verify(functions).deleteFunction(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
+        verify(sink).deleteSink(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
     }
 
     @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink")
@@ -995,6 +994,6 @@ public void testDeleteMissingName() throws Exception {
 
         deleteSink.runCmd();
 
-        verify(functions).deleteFunction(eq(TENANT), eq(NAMESPACE), null);
+        verify(sink).deleteSink(eq(TENANT), eq(NAMESPACE), null);
     }
 }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 6548f2d73b..fe799bc61f 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -36,7 +36,7 @@
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
-import org.apache.pulsar.client.admin.Functions;
+import org.apache.pulsar.client.admin.Source;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -80,7 +80,7 @@ public IObjectFactory getObjectFactory() {
     private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
 
     private PulsarAdmin pulsarAdmin;
-    private Functions functions;
+    private Source source;
     private CmdSources CmdSources;
     private CmdSources.CreateSource createSource;
     private CmdSources.UpdateSource updateSource;
@@ -91,8 +91,8 @@ public IObjectFactory getObjectFactory() {
     public void setup() throws Exception {
 
         pulsarAdmin = mock(PulsarAdmin.class);
-        functions = mock(Functions.class);
-        when(pulsarAdmin.functions()).thenReturn(functions);
+        source = mock(Source.class);
+        when(pulsarAdmin.source()).thenReturn(source);
 
         CmdSources = spy(new CmdSources(pulsarAdmin));
         createSource = spy(CmdSources.getCreateSource());
@@ -826,7 +826,7 @@ public void testDeleteMissingTenant() throws Exception {
 
         deleteSource.runCmd();
 
-        verify(functions).deleteFunction(eq(PUBLIC_TENANT), eq(NAMESPACE), eq(NAME));
+        verify(source).deleteSource(eq(PUBLIC_TENANT), eq(NAMESPACE), eq(NAME));
     }
 
     @Test
@@ -839,7 +839,7 @@ public void testDeleteMissingNamespace() throws Exception {
 
         deleteSource.runCmd();
 
-        verify(functions).deleteFunction(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
+        verify(source).deleteSource(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
     }
 
     @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source")
@@ -852,6 +852,6 @@ public void testDeleteMissingName() throws Exception {
 
         deleteSource.runCmd();
 
-        verify(functions).deleteFunction(eq(TENANT), eq(NAMESPACE), null);
+        verify(source).deleteSource(eq(TENANT), eq(NAMESPACE), null);
     }
 }
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 2e43edc2e0..e173bc9b7b 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -106,11 +106,11 @@
     private Map<String, Object> userConfig;
     private Runtime runtime;
     private boolean autoAck;
-    private int maxMessageRetries;
+    private int maxMessageRetries = -1;
     private String deadLetterTopic;
     private String subName;
     @isPositiveNumber
-    private int parallelism;
+    private int parallelism = 1;
     @isValidResources
     private Resources resources;
     private String fqfn;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index d44ea301a4..95803abb08 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -37,7 +37,7 @@
 
 public class SinkConfigUtils {
 
-    public static FunctionDetails convert(SinkConfig sinkConfig) throws IOException {
+    public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader classLoader) throws IOException {
 
         String sinkClassName = null;
         String typeArg = null;
@@ -53,11 +53,8 @@ public static FunctionDetails convert(SinkConfig sinkConfig) throws IOException
                 }
                 sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class
             } else {
-                sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
-                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
-                        Collections.emptySet())) {
-                    typeArg = Utils.getSinkType(sinkClassName, ncl).getName();
-                }
+                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+                typeArg = Utils.getSinkType(sinkClassName, classLoader).getName();
             }
         }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
index 38e200aefb..dfad9155e0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
@@ -23,17 +23,14 @@
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidSourceConfig;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
 
-import java.util.HashMap;
 import java.util.Map;
 
 @Getter
@@ -55,7 +52,7 @@
     @isValidTopicName
     private String topicName;
 
-    @isImplementationOfClass(implementsClass = SerDe.class)
+    @ConfigValidationAnnotations.isValidSerde
     private String serdeClassName;
 
     private String schemaType;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 73b331ad9d..a132c8a9ef 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -26,16 +26,14 @@
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 
 public class SourceConfigUtils {
 
-    public static FunctionDetails convert(SourceConfig sourceConfig)
+    public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader classLoader)
             throws IllegalArgumentException, IOException {
 
         String sourceClassName = null;
@@ -52,12 +50,8 @@ public static FunctionDetails convert(SourceConfig sourceConfig)
                 }
                 sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class
             } else {
-                sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
-
-                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()),
-                        Collections.emptySet())) {
-                    typeArg = getSourceType(sourceClassName, ncl).getName();
-                }
+                sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
+                typeArg = getSourceType(sourceClassName, classLoader).getName();
             }
         }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 915df059dd..c6feb5ac72 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -46,59 +46,57 @@
     /**
      * Extract the Pulsar IO Source class from a connector archive.
      */
-    public static String getIOSourceClass(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
-            String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
-            ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
-                    ConnectorDefinition.class);
-            if (StringUtils.isEmpty(conf.getSourceClass())) {
-                throw new IOException(
-                        String.format("The '%s' connector does not provide a source implementation", conf.getName()));
-            }
+    public static String getIOSourceClass(ClassLoader classLoader) throws IOException {
+        NarClassLoader ncl = (NarClassLoader) classLoader;
+        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+        ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+                ConnectorDefinition.class);
+        if (StringUtils.isEmpty(conf.getSourceClass())) {
+            throw new IOException(
+                    String.format("The '%s' connector does not provide a source implementation", conf.getName()));
+        }
 
-            try {
-                // Try to load source class and check it implements Source interface
-                Object instance = ncl.loadClass(conf.getSourceClass()).newInstance();
-                if (!(instance instanceof Source)) {
-                    throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
-                            + Source.class.getName());
-                }
-            } catch (Throwable t) {
-                Exceptions.rethrowIOException(t);
+        try {
+            // Try to load source class and check it implements Source interface
+            Object instance = ncl.loadClass(conf.getSourceClass()).newInstance();
+            if (!(instance instanceof Source)) {
+                throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
+                        + Source.class.getName());
             }
-
-            return conf.getSourceClass();
+        } catch (Throwable t) {
+            Exceptions.rethrowIOException(t);
         }
+
+        return conf.getSourceClass();
     }
 
     /**
      * Extract the Pulsar IO Sink class from a connector archive.
      */
-    public static String getIOSinkClass(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
-            String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+    public static String getIOSinkClass(ClassLoader classLoader) throws IOException {
+        NarClassLoader ncl = (NarClassLoader) classLoader;
+        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+        ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+                ConnectorDefinition.class);
+        if (StringUtils.isEmpty(conf.getSinkClass())) {
+            throw new IOException(
+                    String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
+        }
 
-            ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
-                    ConnectorDefinition.class);
-            if (StringUtils.isEmpty(conf.getSinkClass())) {
+        try {
+            // Try to load source class and check it implements Sink interface
+            Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
+            if (!(instance instanceof Sink)) {
                 throw new IOException(
-                        String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
-            }
-
-            try {
-                // Try to load source class and check it implements Sink interface
-                Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
-                if (!(instance instanceof Sink)) {
-                    throw new IOException(
-                            "Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName());
-                }
-            } catch (Throwable t) {
-                Exceptions.rethrowIOException(t);
+                        "Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName());
             }
-
-            return conf.getSinkClass();
+        } catch (Throwable t) {
+            Exceptions.rethrowIOException(t);
         }
+
+        return conf.getSinkClass();
     }
 
     public static ConnectorDefinition getConnectorDefinition(String narPath) throws IOException {
@@ -127,14 +125,10 @@ public static Connectors searchForConnectors(String connectorsDirectory) throws
                     log.info("Found connector {} from {}", cntDef, archive);
 
                     if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        // Validate source class to be present and of the right type
-                        ConnectorUtils.getIOSourceClass(archive.toString());
                         connectors.sources.put(cntDef.getName(), archive);
                     }
 
                     if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        // Validate sinkclass to be present and of the right type
-                        ConnectorUtils.getIOSinkClass(archive.toString());
                         connectors.sinks.put(cntDef.getName(), archive);
                     }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
index 88ee41b4f9..0b600f4c78 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.utils.validation;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.utils.FunctionConfig;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index dba6bd91cd..08690fe855 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -39,7 +39,6 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -727,14 +726,15 @@ public void validateField(String name, Object o, ClassLoader classLoader) {
         @Override
         public void validateField(String name, Object o, ClassLoader classLoader) {
             SourceConfig sourceConfig = (SourceConfig) o;
-            if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) {
-                // We don't have to check the archive, since it's provided on the worker itself
+            if (classLoader == null) {
+                // This happens at the cli for builtin. There is no need to check this since
+                // the actual check will be done at serverside
                 return;
             }
 
             String sourceClassName;
             try {
-                sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
+                sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
             } catch (IOException e1) {
                 throw new IllegalArgumentException("Failed to extract source class from archive", e1);
             }
@@ -743,15 +743,14 @@ public void validateField(String name, Object o, ClassLoader classLoader) {
             Class<?> typeArg = getSourceType(sourceClassName, classLoader);
 
             // Only one of serdeClassName or schemaType should be set
-            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()
-                    && sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
                 throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
             }
 
-            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
                 FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, classLoader, false);
             }
-            if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
                 FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, classLoader, false);
             }
         }
@@ -761,8 +760,9 @@ public void validateField(String name, Object o, ClassLoader classLoader) {
         @Override
         public void validateField(String name, Object o, ClassLoader classLoader) {
             SinkConfig sinkConfig = (SinkConfig) o;
-            if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) {
-                // We don't have to check the archive, since it's provided on the worker itself
+            if (classLoader == null) {
+                // This happens at the cli for builtin. There is no need to check this since
+                // the actual check will be done at serverside
                 return;
             }
 
@@ -779,42 +779,42 @@ public void validateField(String name, Object o, ClassLoader classLoader) {
             }
 
 
-            try (NarClassLoader clsLoader = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
-                    Collections.emptySet())) {
-                String sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
-                Class<?> typeArg = getSinkType(sinkClassName, clsLoader);
+            String sinkClassName;
+            try {
+                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+            } catch (IOException e1) {
+                throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
+            }
+            Class<?> typeArg = getSinkType(sinkClassName, classLoader);
 
-                if (sinkConfig.getTopicToSerdeClassName() != null) {
-                    sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
-                        FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader, true);
-                    });
-                }
+            if (sinkConfig.getTopicToSerdeClassName() != null) {
+                sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
+                    FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, classLoader, true);
+                });
+            }
 
-                if (sinkConfig.getTopicToSchemaType() != null) {
-                    sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
-                        FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader, true);
-                    });
-                }
+            if (sinkConfig.getTopicToSchemaType() != null) {
+                sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
+                    FunctionConfigValidator.validateSchema(schemaType, typeArg, name, classLoader, true);
+                });
+            }
 
-                // topicsPattern does not need checks
-
-                if (sinkConfig.getInputSpecs() != null) {
-                    sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
-                        // Only one is set
-                        if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()
-                                && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
-                            throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
-                        }
-                        if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
-                            FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader, true);
-                        }
-                        if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
-                            FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader, true);
-                        }
-                    });
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException(e.getMessage());
+            // topicsPattern does not need checks
+
+            if (sinkConfig.getInputSpecs() != null) {
+                sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
+                    // Only one is set
+                    if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()
+                            && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
+                        throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
+                    }
+                    if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
+                        FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, classLoader, true);
+                    }
+                    if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
+                        FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, classLoader, true);
+                    }
+                });
             }
         }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index aa80403009..f9359916a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -33,6 +33,7 @@
 import java.net.URL;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
 import java.util.UUID;
 import lombok.extern.slf4j.Slf4j;
 
@@ -47,6 +48,7 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
@@ -163,6 +165,32 @@ public static ClassLoader validateFileUrl(String destPkgUrl, String downloadPkgD
             throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
         }
     }
+
+    public static NarClassLoader extractNarClassloader(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException {
+        if (destPkgUrl.startsWith(FILE)) {
+            URL url = new URL(destPkgUrl);
+            File file = new File(url.toURI());
+            if (!file.exists()) {
+                throw new IllegalArgumentException(destPkgUrl + " does not exists locally");
+            }
+            try {
+                return NarClassLoader.getFromArchive(file, Collections.emptySet());
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException(
+                        "Corrupt User PackageFile " + file + " with error " + e.getMessage());
+            }
+        } else if (destPkgUrl.startsWith("http")) {
+            URL website = new URL(destPkgUrl);
+            File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString());
+            if (!tempFile.exists()) {
+                throw new IllegalArgumentException("Could not create local file " + tempFile);
+            }
+            tempFile.deleteOnExit();
+            return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
+        } else {
+            throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
+        }
+    }
     
     public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException {
         URL website = new URL(destPkgUrl);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 26c71272c6..ac011db2f7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,6 +20,8 @@
 
 import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.SinkApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.SourceApiV2Resource;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
@@ -36,6 +38,8 @@ private Resources() {
         return new HashSet<>(
                 Arrays.asList(
                         FunctionApiV2Resource.class,
+                        SourceApiV2Resource.class,
+                        SinkApiV2Resource.class,
                         WorkerApiV2Resource.class,
                         MultiPartFeature.class
                 ));
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 6ab5f0d8ad..0b245cf391 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -27,7 +27,6 @@
 import static org.apache.pulsar.functions.utils.Utils.FILE;
 import static org.apache.pulsar.functions.utils.Utils.HTTP;
 import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
-import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create;
 
 import com.google.gson.Gson;
 
@@ -39,14 +38,9 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.CopyOption;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
+import java.nio.file.Path;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -79,6 +73,7 @@
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
@@ -89,9 +84,7 @@
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
+import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -136,6 +129,7 @@ private boolean isWorkerServiceAvailable() {
     public Response registerFunction(final String tenant, final String namespace, final String functionName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
             final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
+            final String sourceConfigJson, final String sinkConfigJson,
             final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
@@ -173,10 +167,10 @@ public Response registerFunction(final String tenant, final String namespace, fi
         try {
             if (isPkgUrlProvided) {
                 functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
-                        functionDetailsJson, functionConfigJson);
+                        functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
             } else {
                 functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, functionConfigJson);
+                        fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
             }
         } catch (Exception e) {
             log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
@@ -216,7 +210,7 @@ public Response registerFunction(final String tenant, final String namespace, fi
     public Response updateFunction(final String tenant, final String namespace, final String functionName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
             final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
-            final String clientRole) {
+            final String sourceConfigJson, final String sinkConfigJson, final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -252,10 +246,10 @@ public Response updateFunction(final String tenant, final String namespace, fina
         try {
             if (isPkgUrlProvided) {
                 functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
-                        functionDetailsJson, functionConfigJson);
+                        functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
             } else {
                 functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, functionConfigJson);
+                        fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
             }
         } catch (Exception e) {
             log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
@@ -881,23 +875,24 @@ private void validateDeregisterRequestParams(String tenant, String namespace, St
     }
 
     private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName,
-            String functionPkgUrl, String functionDetailsJson, String functionConfigJson)
+            String functionPkgUrl, String functionDetailsJson, String functionConfigJson,
+            String sourceConfigJson, String sinkConfigJson)
             throws IllegalArgumentException, IOException, URISyntaxException {
         if (!isFunctionPackageUrlSupported(functionPkgUrl)) {
             throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
         }
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                functionDetailsJson, functionConfigJson, functionPkgUrl, null);
+                functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, functionPkgUrl, null);
         return functionDetails;
     }
 
     private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
             File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
-            String functionConfigJson)
+            String functionConfigJson, String sourceConfigJson, String sinkConfigJson)
             throws IllegalArgumentException, IOException, URISyntaxException {
 
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                functionDetailsJson, functionConfigJson, null, uploadedInputStreamAsFile);
+                functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, null, uploadedInputStreamAsFile);
         if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) {
             throw new IllegalArgumentException("Function Package is not provided");
         }
@@ -970,7 +965,8 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
     }
 
     private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
-            String functionDetailsJson, String functionConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
+            String functionDetailsJson, String functionConfigJson, String sourceConfigJson,
+            String sinkConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -981,11 +977,24 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
             throw new IllegalArgumentException("Function Name is not provided");
         }
 
-        if (StringUtils.isEmpty(functionDetailsJson) && StringUtils.isEmpty(functionConfigJson)) {
-            throw new IllegalArgumentException("FunctionConfig is not provided");
+        int numDefinitions = 0;
+        if (!StringUtils.isEmpty(functionDetailsJson)) {
+            numDefinitions++;
+        }
+        if (!StringUtils.isEmpty(functionConfigJson)) {
+            numDefinitions++;
+        }
+        if (!StringUtils.isEmpty(sourceConfigJson)) {
+            numDefinitions++;
         }
-        if (!StringUtils.isEmpty(functionDetailsJson) && !StringUtils.isEmpty(functionConfigJson)) {
-            throw new IllegalArgumentException("Only one of FunctionDetails or FunctionConfig should be provided");
+        if (!StringUtils.isEmpty(sinkConfigJson)) {
+            numDefinitions++;
+        }
+        if (numDefinitions == 0) {
+            throw new IllegalArgumentException("Function Info is not provided");
+        }
+        if (numDefinitions > 1) {
+            throw new IllegalArgumentException("Conflicting Info provided");
         }
         if (!StringUtils.isEmpty(functionConfigJson)) {
             FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class);
@@ -999,6 +1008,18 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
             ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
         }
+        if (!StringUtils.isEmpty(sourceConfigJson)) {
+            SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class);
+            NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true);
+            ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
+            return SourceConfigUtils.convert(sourceConfig, clsLoader);
+        }
+        if (!StringUtils.isEmpty(sinkConfigJson)) {
+            SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class);
+            NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false);
+            ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
+            return SinkConfigUtils.convert(sinkConfig, clsLoader);
+        }
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
         org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder);
         if (isNotBlank(functionPkgUrl)) {
@@ -1058,6 +1079,59 @@ private ClassLoader extractClassLoader(String functionPkgUrl, File uploadedInput
         }
     }
 
+    public NarClassLoader extractNarClassLoader(String archive, String pkgUrl, File uploadedInputStreamFileName,
+                                                 boolean isSource) {
+        if (!StringUtils.isEmpty(archive)) {
+            String builtinArchive = archive;
+            if (archive.startsWith(org.apache.pulsar.functions.utils.Utils.BUILTIN)) {
+                builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+            }
+            if (isSource) {
+                Path path;
+                try {
+                    path = this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("No Source archive %s found", archive));
+                }
+                try {
+                    return NarClassLoader.getFromArchive(path.toFile(),
+                            Collections.emptySet());
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("The source %s is corrupted", archive));
+                }
+            } else {
+                Path path;
+                try {
+                    path = this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("No Sink archive %s found", archive));
+                }
+                try {
+                    return NarClassLoader.getFromArchive(path.toFile(),
+                            Collections.emptySet());
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("The sink %s is corrupted", archive));
+                }
+            }
+        }
+        if (!StringUtils.isEmpty(pkgUrl)) {
+            try {
+                return Utils.extractNarClassloader(pkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
+            } catch (Exception e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+        }
+        if (uploadedInputStreamFileName != null) {
+            try {
+                return NarClassLoader.getFromArchive(uploadedInputStreamFileName,
+                        Collections.emptySet());
+            } catch (IOException e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+        }
+        return null;
+    }
+
     private void validateFunctionClassTypes(ClassLoader classLoader, FunctionDetails.Builder functionDetailsBuilder) {
 
         // validate only if classLoader is provided
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 46c7974b82..d6e1439cbf 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -60,7 +60,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
 
     }
 
@@ -77,7 +77,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
 
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
new file mode 100644
index 0000000000..151c2c1ccf
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Path("/sink")
+public class SinkApiV2Resource extends FunctionApiResource {
+
+    @POST
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerSink(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace,
+                                 final @PathParam("sinkName") String sinkName,
+                                 final @FormDataParam("data") InputStream uploadedInputStream,
+                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String functionPkgUrl,
+                                 final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+
+    }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateSink(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("sinkName") String sinkName,
+                               final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                               final @FormDataParam("url") String functionPkgUrl,
+                               final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+
+    }
+
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    public Response deregisterSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+        return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    public Response getSinkInfo(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("sinkName") String sinkName)
+            throws IOException {
+        return functions.getFunctionInfo(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
+    public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("sinkName") String sinkName,
+                                          final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sinkName}/status")
+    public Response getSinkStatus(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("sinkName") String sinkName) throws IOException {
+        return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}")
+    public Response listSink(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(tenant, namespace);
+
+    }
+
+    @POST
+    @ApiOperation(value = "Restart sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+        return functions.restartFunctionInstances(tenant, namespace, sinkName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+        return functions.stopFunctionInstances(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @Path("/builtinsinks")
+    public List<ConnectorDefinition> getSinkList() {
+        List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
new file mode 100644
index 0000000000..44fac19d70
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Path("/source")
+public class SourceApiV2Resource extends FunctionApiResource {
+
+    @POST
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerSource(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("sourceName") String sourceName,
+                                   final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String functionPkgUrl,
+                                   final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+
+    }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateSource(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace,
+                                 final @PathParam("sourceName") String sourceName,
+                                 final @FormDataParam("data") InputStream uploadedInputStream,
+                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String functionPkgUrl,
+                                 final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+
+    }
+
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    public Response deregisterSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+        return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    public Response getSourceInfo(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("sourceName") String sourceName)
+            throws IOException {
+        return functions.getFunctionInfo(tenant, namespace, sourceName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
+    public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant,
+                                            final @PathParam("namespace") String namespace,
+                                            final @PathParam("sourceName") String sourceName,
+                                            final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sourceName}/status")
+    public Response getSourceStatus(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("sourceName") String sourceName) throws IOException {
+        return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}")
+    public Response listSources(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(tenant, namespace);
+
+    }
+
+    @POST
+    @ApiOperation(value = "Restart source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all source instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+        return functions.restartFunctionInstances(tenant, namespace, sourceName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all source instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+        return functions.stopFunctionInstances(tenant, namespace, sourceName);
+    }
+
+    @GET
+    @Path("/builtinsources")
+    public List<ConnectorDefinition> getSourceList() {
+        List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 9ce87d016e..d514faba69 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -52,7 +52,6 @@
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
@@ -66,8 +65,6 @@
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -98,22 +95,6 @@ public String process(String input, Context context) {
         }
     }
 
-    public static final class TestSink implements Sink<byte[]> {
-
-        @Override
-        public void close() {
-        }
-
-        @Override
-        public void open(Map config, SinkContext sinkContext) {
-        }
-
-        @Override
-        public void write(Record<byte[]> record) {
-        }
-    }
-
-
     private static final String tenant = "test-tenant";
     private static final String namespace = "test-namespace";
     private static final String function = "test-function";
@@ -282,22 +263,6 @@ public void testRegisterFunctionMissingClassName() throws IOException {
                 "Field 'className' cannot be null!");
     }
 
-    @Test
-    public void testRegisterFunctionMissingParallelism() throws IOException {
-        testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                className,
-                null,
-                "Field 'parallelism' must be a Positive Number");
-    }
-
     private void testRegisterFunctionMissingArguments(
             String tenant,
             String namespace,
@@ -346,6 +311,8 @@ private void testRegisterFunctionMissingArguments(
                 null,
                 null,
                 new Gson().toJson(functionConfig),
+                null,
+                null,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -372,7 +339,9 @@ private Response registerDefaultFunction() {
             null,
             null,
             new Gson().toJson(functionConfig),
-            null);
+            null,
+                null,
+                null);
     }
 
     @Test
@@ -580,21 +549,6 @@ public void testUpdateFunctionMissingClassName() throws IOException {
             parallelism,
                 "Field 'className' cannot be null!");
     }
-    @Test
-    public void testUpdateFunctionMissingParallelism() throws IOException {
-        testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                className,
-                null,
-                "Field 'parallelism' must be a Positive Number");
-    }
 
     private void testUpdateFunctionMissingArguments(
             String tenant,
@@ -646,7 +600,9 @@ private void testUpdateFunctionMissingArguments(
             null,
             null,
             new Gson().toJson(functionConfig),
-            null);
+            null,
+                null,
+                null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason);
@@ -673,7 +629,9 @@ private Response updateDefaultFunction() throws IOException {
             null,
             null,
             new Gson().toJson(functionConfig),
-            null);
+            null,
+                null,
+                null);
     }
 
     @Test
@@ -756,7 +714,9 @@ public void testUpdateFunctionWithUrl() throws IOException {
             filePackageUrl,
             null,
             new Gson().toJson(functionConfig),
-            null);
+            null,
+                null,
+                null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
@@ -1108,46 +1068,8 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException {
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
-                null, new Gson().toJson(functionConfig), null);
+                null, new Gson().toJson(functionConfig), null, null, null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
-
-    /*
-    TODO:- Needs to be moved to a source/sink specific unittest
-    @Test
-    public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException {
-        Configurator.setRootLevel(Level.DEBUG);
-
-        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        String filePackageUrl = "file://" + fileLocation;
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
-
-        RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
-        FunctionConfig functionConfig = new FunctionConfig();
-        functionConfig.setTenant(tenant);
-        functionConfig.setNamespace(namespace);
-        functionConfig.setName(function);
-        functionConfig.setClassName(className);
-        functionConfig.setParallelism(parallelism);
-        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
-        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
-        functionConfig.setOutput(outputTopic);
-        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        SinkSpec sinkSpec = SinkSpec.newBuilder().setClassName(className).setTopic(outputTopic)
-                .setSerDeClassName(outputSerdeClassName).build();
-        FunctionDetails functionDetails = FunctionDetails
-                .newBuilder().setTenant(tenant).setNamespace(namespace).setName(function).setSink(sinkSpec)
-                .setClassName(className).setParallelism(parallelism).setSource(SourceSpec.newBuilder()
-                        .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
-                .build();
-        Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
-                null, new Gson().toJson(functionConfig), null);
-
-        assertEquals(response.getStatus(), Status.BAD_REQUEST.getStatusCode());
-    }
-    */
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
new file mode 100644
index 0000000000..4e52c95442
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -0,0 +1,905 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.SinkConfig;
+import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.*;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link SinkApiV2Resource}.
+ */
+@PrepareForTest({Utils.class, SinkConfigUtils.class})
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
+@Slf4j
+public class SinkApiV2ResourceTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    private static final class TestSink implements Sink<String> {
+
+        @Override public void open(final Map<String, Object> config, SinkContext sinkContext) {
+        }
+
+        @Override public void write(Record<String> record) { }
+
+        @Override public void close() { }
+    }
+
+    private static final String tenant = "test-tenant";
+    private static final String namespace = "test-namespace";
+    private static final String sink = "test-sink";
+    private static final Map<String, String> topicsToSerDeClassName = new HashMap<>();
+    static {
+        topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE);
+    }
+    private static final String subscriptionName = "test-subscription";
+    private static final String className = TestSink.class.getName();
+    private static final String serde = TopicSchema.DEFAULT_SERDE;
+    private static final int parallelism = 1;
+
+    private WorkerService mockedWorkerService;
+    private FunctionMetaDataManager mockedManager;
+    private FunctionRuntimeManager mockedFunctionRunTimeManager;
+    private RuntimeFactory mockedRuntimeFactory;
+    private Namespace mockedNamespace;
+    private FunctionsImpl resource;
+    private InputStream mockedInputStream;
+    private FormDataContentDisposition mockedFormData;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        this.mockedManager = mock(FunctionMetaDataManager.class);
+        this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
+        this.mockedRuntimeFactory = mock(RuntimeFactory.class);
+        this.mockedInputStream = mock(InputStream.class);
+        this.mockedNamespace = mock(Namespace.class);
+        this.mockedFormData = mock(FormDataContentDisposition.class);
+        when(mockedFormData.getFileName()).thenReturn("test");
+
+        this.mockedWorkerService = mock(WorkerService.class);
+        when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+        when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
+        when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
+        when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
+        when(mockedWorkerService.isInitialized()).thenReturn(true);
+
+        // worker config
+        WorkerConfig workerConfig = new WorkerConfig()
+            .setWorkerId("test")
+            .setWorkerPort(8080)
+            .setDownloadDirectory("/tmp/pulsar/functions")
+            .setFunctionMetadataTopicName("pulsar/functions")
+            .setNumFunctionPackageReplicas(3)
+            .setPulsarServiceUrl("pulsar://localhost:6650/");
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+
+        this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
+        doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
+        mockStatic(SinkConfigUtils.class);
+        when(SinkConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+    }
+
+    //
+    // Register Functions
+    //
+
+    @Test
+    public void testRegisterSinkMissingTenant() throws IOException {
+        testRegisterSinkMissingArguments(
+            null,
+            namespace,
+                sink,
+            mockedInputStream,
+            mockedFormData,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Tenant is not provided");
+    }
+
+    @Test
+    public void testRegisterSinkMissingNamespace() throws IOException {
+        testRegisterSinkMissingArguments(
+            tenant,
+            null,
+                sink,
+            mockedInputStream,
+            mockedFormData,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Namespace is not provided");
+    }
+
+    @Test
+    public void testRegisterSinkMissingFunctionName() throws IOException {
+        testRegisterSinkMissingArguments(
+            tenant,
+            namespace,
+            null,
+            mockedInputStream,
+            mockedFormData,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Function Name is not provided");
+    }
+
+    @Test
+    public void testRegisterSinkMissingPackage() throws IOException {
+        testRegisterSinkMissingArguments(
+            tenant,
+            namespace,
+                sink,
+            null,
+            mockedFormData,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Function Package is not provided");
+    }
+
+    @Test
+    public void testRegisterSinkMissingPackageDetails() throws IOException {
+        testRegisterSinkMissingArguments(
+            tenant,
+            namespace,
+                sink,
+            mockedInputStream,
+            null,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Function Package is not provided");
+    }
+
+    private void testRegisterSinkMissingArguments(
+            String tenant,
+            String namespace,
+            String sink,
+            InputStream inputStream,
+            FormDataContentDisposition details,
+            Map<String, String> inputTopicMap,
+            String className,
+            Integer parallelism,
+            String errorExpected) throws IOException {
+        SinkConfig sinkConfig = new SinkConfig();
+        if (tenant != null) {
+            sinkConfig.setTenant(tenant);
+        }
+        if (namespace != null) {
+            sinkConfig.setNamespace(namespace);
+        }
+        if (sink != null) {
+            sinkConfig.setName(sink);
+        }
+        if (inputTopicMap != null) {
+            sinkConfig.setTopicToSerdeClassName(inputTopicMap);
+        }
+        if (className != null) {
+            sinkConfig.setClassName(className);
+        }
+        if (parallelism != null) {
+            sinkConfig.setParallelism(parallelism);
+        }
+
+        Response response = resource.registerFunction(
+                tenant,
+                namespace,
+                sink,
+                inputStream,
+                details,
+                null,
+                null,
+                null,
+                null,
+                new Gson().toJson(sinkConfig),
+                null);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason);
+    }
+
+    private Response registerDefaultSink() {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sink);
+        sinkConfig.setClassName(className);
+        sinkConfig.setParallelism(parallelism);
+        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        return resource.registerFunction(
+            tenant,
+            namespace,
+                sink,
+            mockedInputStream,
+            mockedFormData,
+            null,
+            null,
+            null,
+            null,
+            new Gson().toJson(sinkConfig),
+                null);
+    }
+
+    @Test
+    public void testRegisterExistedSink() throws IOException {
+        Configurator.setRootLevel(Level.DEBUG);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        Response response = registerDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterSinkUploadFailure() throws Exception {
+        mockStatic(Utils.class);
+        doThrow(new IOException("upload failure")).when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+        Response response = registerDefaultSink();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterSinkSuccess() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(true)
+            .setMessage("source registered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = registerDefaultSink();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
+    @Test
+    public void testRegisterSinkFailure() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(false)
+            .setMessage("source failed to register");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = registerDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterSinkInterrupted() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+            new IOException("Function registeration interrupted"));
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = registerDefaultSink();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    //
+    // Update Functions
+    //
+
+    @Test
+    public void testUpdateSinkMissingTenant() throws IOException {
+        testUpdateSinkMissingArguments(
+            null,
+            namespace,
+                sink,
+            mockedInputStream,
+            mockedFormData,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Tenant is not provided");
+    }
+
+    @Test
+    public void testUpdateSinkMissingNamespace() throws IOException {
+        testUpdateSinkMissingArguments(
+            tenant,
+            null,
+                sink,
+            mockedInputStream,
+            mockedFormData,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Namespace is not provided");
+    }
+
+    @Test
+    public void testUpdateSinkMissingFunctionName() throws IOException {
+        testUpdateSinkMissingArguments(
+            tenant,
+            namespace,
+            null,
+            mockedInputStream,
+            mockedFormData,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Function Name is not provided");
+    }
+
+    @Test
+    public void testUpdateSinkMissingPackage() throws IOException {
+        testUpdateSinkMissingArguments(
+            tenant,
+            namespace,
+                sink,
+            null,
+            mockedFormData,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Function Package is not provided");
+    }
+
+    @Test
+    public void testUpdateSinkMissingPackageDetails() throws IOException {
+        testUpdateSinkMissingArguments(
+            tenant,
+            namespace,
+                sink,
+            mockedInputStream,
+            null,
+            topicsToSerDeClassName,
+            className,
+            parallelism,
+                "Function Package is not provided");
+    }
+
+    private void testUpdateSinkMissingArguments(
+            String tenant,
+            String namespace,
+            String sink,
+            InputStream inputStream,
+            FormDataContentDisposition details,
+            Map<String, String> inputTopicsMap,
+            String className,
+            Integer parallelism,
+            String expectedError) throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        SinkConfig sinkConfig = new SinkConfig();
+        if (tenant != null) {
+            sinkConfig.setTenant(tenant);
+        }
+        if (namespace != null) {
+            sinkConfig.setNamespace(namespace);
+        }
+        if (sink != null) {
+            sinkConfig.setName(sink);
+        }
+        if (inputTopicsMap != null) {
+            sinkConfig.setTopicToSerdeClassName(inputTopicsMap);
+        }
+        if (className != null) {
+            sinkConfig.setClassName(className);
+        }
+        if (parallelism != null) {
+            sinkConfig.setParallelism(parallelism);
+        }
+
+        Response response = resource.updateFunction(
+            tenant,
+            namespace,
+            sink,
+            inputStream,
+            details,
+            null,
+            null,
+            null,
+            null,
+            new Gson().toJson(sinkConfig),
+                null);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason);
+    }
+
+    private Response updateDefaultSink() throws IOException {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sink);
+        sinkConfig.setClassName(className);
+        sinkConfig.setParallelism(parallelism);
+        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+
+        return resource.updateFunction(
+            tenant,
+            namespace,
+                sink,
+            mockedInputStream,
+            mockedFormData,
+            null,
+            null,
+            null,
+            null,
+            new Gson().toJson(sinkConfig),
+                null);
+    }
+
+    @Test
+    public void testUpdateNotExistedSink() throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+        Response response = updateDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testUpdateSinkUploadFailure() throws Exception {
+        mockStatic(Utils.class);
+        doThrow(new IOException("upload failure")).when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        Response response = updateDefaultSink();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testUpdateSinkSuccess() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(true)
+            .setMessage("source registered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = updateDefaultSink();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
+    @Test
+    public void testUpdateSinkWithUrl() throws IOException {
+        Configurator.setRootLevel(Level.DEBUG);
+
+        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String filePackageUrl = "file://" + fileLocation;
+
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sink);
+        sinkConfig.setClassName(className);
+        sinkConfig.setParallelism(parallelism);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+        RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("source registered");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = resource.updateFunction(
+            tenant,
+            namespace,
+                sink,
+            null,
+            null,
+            filePackageUrl,
+            null,
+            null,
+            null,
+            new Gson().toJson(sinkConfig),
+                null);
+
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
+    @Test
+    public void testUpdateSinkFailure() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(false)
+            .setMessage("source failed to register");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = updateDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testUpdateSinkInterrupted() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+            new IOException("Function registeration interrupted"));
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = updateDefaultSink();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    //
+    // deregister source
+    //
+
+    @Test
+    public void testDeregisterSinkMissingTenant() throws Exception {
+        testDeregisterSinkMissingArguments(
+            null,
+            namespace,
+                sink,
+            "Tenant");
+    }
+
+    @Test
+    public void testDeregisterSinkMissingNamespace() throws Exception {
+        testDeregisterSinkMissingArguments(
+            tenant,
+            null,
+                sink,
+            "Namespace");
+    }
+
+    @Test
+    public void testDeregisterSinkMissingFunctionName() throws Exception {
+        testDeregisterSinkMissingArguments(
+            tenant,
+            namespace,
+            null,
+            "Function Name");
+    }
+
+    private void testDeregisterSinkMissingArguments(
+        String tenant,
+        String namespace,
+        String sink,
+        String missingFieldName
+    ) {
+        Response response = resource.deregisterFunction(
+            tenant,
+            namespace,
+            sink,
+            null);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    private Response deregisterDefaultSink() {
+        return resource.deregisterFunction(
+            tenant,
+            namespace,
+                sink,
+            null);
+    }
+
+    @Test
+    public void testDeregisterNotExistedSink() {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+        Response response = deregisterDefaultSink();
+        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testDeregisterSinkSuccess() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(true)
+            .setMessage("source deregistered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+
+        Response response = deregisterDefaultSink();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(rr.toJson(), response.getEntity());
+    }
+
+    @Test
+    public void testDeregisterSinkFailure() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(false)
+            .setMessage("source failed to deregister");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+
+        Response response = deregisterDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testDeregisterSinkInterrupted() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+            new IOException("Function deregisteration interrupted"));
+        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+
+        Response response = deregisterDefaultSink();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    // Source Info doesn't exist. Maybe one day they might be added
+    //
+    // Get Function Info
+    //
+
+    /*
+    @Test
+    public void testGetFunctionMissingTenant() throws Exception {
+        testGetFunctionMissingArguments(
+            null,
+            namespace,
+                source,
+            "Tenant");
+    }
+
+    @Test
+    public void testGetFunctionMissingNamespace() throws Exception {
+        testGetFunctionMissingArguments(
+            tenant,
+            null,
+                source,
+            "Namespace");
+    }
+
+    @Test
+    public void testGetFunctionMissingFunctionName() throws Exception {
+        testGetFunctionMissingArguments(
+            tenant,
+            namespace,
+            null,
+            "Function Name");
+    }
+
+    private void testGetFunctionMissingArguments(
+        String tenant,
+        String namespace,
+        String function,
+        String missingFieldName
+    ) throws IOException {
+        Response response = resource.getFunctionInfo(
+            tenant,
+            namespace,
+            function);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    private Response getDefaultFunctionInfo() throws IOException {
+        return resource.getFunctionInfo(
+            tenant,
+            namespace,
+                source);
+    }
+
+    @Test
+    public void testGetNotExistedFunction() throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+        Response response = getDefaultFunctionInfo();
+        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testGetFunctionSuccess() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        SinkSpec sinkSpec = SinkSpec.newBuilder()
+                .setTopic(outputTopic)
+                .setSerDeClassName(outputSerdeClassName).build();
+        FunctionDetails functionDetails = FunctionDetails.newBuilder()
+                .setClassName(className)
+                .setSink(sinkSpec)
+                .setName(source)
+                .setNamespace(namespace)
+                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+                .setTenant(tenant)
+                .setParallelism(parallelism)
+                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
+                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+        FunctionMetaData metaData = FunctionMetaData.newBuilder()
+            .setCreateTime(System.currentTimeMillis())
+            .setFunctionDetails(functionDetails)
+            .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+            .setVersion(1234)
+            .build();
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
+
+        Response response = getDefaultFunctionInfo();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            response.getEntity());
+    }
+
+    //
+    // List Functions
+    //
+
+    @Test
+    public void testListFunctionsMissingTenant() throws Exception {
+        testListFunctionsMissingArguments(
+            null,
+            namespace,
+            "Tenant");
+    }
+
+    @Test
+    public void testListFunctionsMissingNamespace() throws Exception {
+        testListFunctionsMissingArguments(
+            tenant,
+            null,
+            "Namespace");
+    }
+
+    private void testListFunctionsMissingArguments(
+        String tenant,
+        String namespace,
+        String missingFieldName
+    ) {
+        Response response = resource.listFunctions(
+            tenant,
+            namespace);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    private Response listDefaultFunctions() {
+        return resource.listFunctions(
+            tenant,
+            namespace);
+    }
+
+    @Test
+    public void testListFunctionsSuccess() throws Exception {
+        List<String> functions = Lists.newArrayList("test-1", "test-2");
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+
+        Response response = listDefaultFunctions();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(new Gson().toJson(functions), response.getEntity());
+    }
+    */
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
new file mode 100644
index 0000000000..1ca869ea8b
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -0,0 +1,916 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.proto.Function.*;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.SourceConfig;
+import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.*;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.*;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link SourceApiV2Resource}.
+ */
+@PrepareForTest({Utils.class,SourceConfigUtils.class})
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
+@Slf4j
+public class SourceApiV2ResourceTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    private static final class TestSource implements Source<String> {
+
+        @Override public void open(final Map<String, Object> config, SourceContext sourceContext) {
+        }
+
+        @Override public Record<String> read() { return null; }
+
+        @Override public void close() { }
+    }
+
+    private static final String tenant = "test-tenant";
+    private static final String namespace = "test-namespace";
+    private static final String source = "test-source";
+    private static final String outputTopic = "test-output-topic";
+    private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE;
+    private static final String className = TestSource.class.getName();
+    private static final String serde = TopicSchema.DEFAULT_SERDE;
+    private static final int parallelism = 1;
+
+    private WorkerService mockedWorkerService;
+    private FunctionMetaDataManager mockedManager;
+    private FunctionRuntimeManager mockedFunctionRunTimeManager;
+    private RuntimeFactory mockedRuntimeFactory;
+    private Namespace mockedNamespace;
+    private FunctionsImpl resource;
+    private InputStream mockedInputStream;
+    private FormDataContentDisposition mockedFormData;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        this.mockedManager = mock(FunctionMetaDataManager.class);
+        this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
+        this.mockedRuntimeFactory = mock(RuntimeFactory.class);
+        this.mockedInputStream = mock(InputStream.class);
+        this.mockedNamespace = mock(Namespace.class);
+        this.mockedFormData = mock(FormDataContentDisposition.class);
+        when(mockedFormData.getFileName()).thenReturn("test");
+
+        this.mockedWorkerService = mock(WorkerService.class);
+        when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+        when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
+        when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
+        when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
+        when(mockedWorkerService.isInitialized()).thenReturn(true);
+
+        // worker config
+        WorkerConfig workerConfig = new WorkerConfig()
+            .setWorkerId("test")
+            .setWorkerPort(8080)
+            .setDownloadDirectory("/tmp/pulsar/functions")
+            .setFunctionMetadataTopicName("pulsar/functions")
+            .setNumFunctionPackageReplicas(3)
+            .setPulsarServiceUrl("pulsar://localhost:6650/");
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+
+        this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
+        doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
+        mockStatic(SourceConfigUtils.class);
+        when(SourceConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+    }
+
+    //
+    // Register Functions
+    //
+
+    @Test
+    public void testRegisterSourceMissingTenant() throws IOException {
+        testRegisterSourceMissingArguments(
+            null,
+            namespace,
+                source,
+            mockedInputStream,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Tenant is not provided");
+    }
+
+    @Test
+    public void testRegisterSourceMissingNamespace() throws IOException {
+        testRegisterSourceMissingArguments(
+            tenant,
+            null,
+                source,
+            mockedInputStream,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Namespace is not provided");
+    }
+
+    @Test
+    public void testRegisterSourceMissingFunctionName() throws IOException {
+        testRegisterSourceMissingArguments(
+            tenant,
+            namespace,
+            null,
+            mockedInputStream,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Function Name is not provided");
+    }
+
+    @Test
+    public void testRegisterSourceMissingPackage() throws IOException {
+        testRegisterSourceMissingArguments(
+            tenant,
+            namespace,
+                source,
+            null,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Function Package is not provided");
+    }
+
+    @Test
+    public void testRegisterSourceMissingPackageDetails() throws IOException {
+        testRegisterSourceMissingArguments(
+            tenant,
+            namespace,
+                source,
+            mockedInputStream,
+            null,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Function Package is not provided");
+    }
+
+    private void testRegisterSourceMissingArguments(
+            String tenant,
+            String namespace,
+            String function,
+            InputStream inputStream,
+            FormDataContentDisposition details,
+            String outputTopic,
+            String outputSerdeClassName,
+            String className,
+            Integer parallelism,
+            String errorExpected) throws IOException {
+        SourceConfig sourceConfig = new SourceConfig();
+        if (tenant != null) {
+            sourceConfig.setTenant(tenant);
+        }
+        if (namespace != null) {
+            sourceConfig.setNamespace(namespace);
+        }
+        if (function != null) {
+            sourceConfig.setName(function);
+        }
+        if (outputTopic != null) {
+            sourceConfig.setTopicName(outputTopic);
+        }
+        if (outputSerdeClassName != null) {
+            sourceConfig.setSerdeClassName(outputSerdeClassName);
+        }
+        if (className != null) {
+            sourceConfig.setClassName(className);
+        }
+        if (parallelism != null) {
+            sourceConfig.setParallelism(parallelism);
+        }
+
+        Response response = resource.registerFunction(
+                tenant,
+                namespace,
+                function,
+                inputStream,
+                details,
+                null,
+                null,
+                null,
+                new Gson().toJson(sourceConfig),
+                null,
+                null);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason);
+    }
+
+    private Response registerDefaultSource() {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(source);
+        sourceConfig.setClassName(className);
+        sourceConfig.setParallelism(parallelism);
+        sourceConfig.setTopicName(outputTopic);
+        sourceConfig.setSerdeClassName(outputSerdeClassName);
+        return resource.registerFunction(
+            tenant,
+            namespace,
+                source,
+            mockedInputStream,
+            mockedFormData,
+            null,
+            null,
+            null,
+            new Gson().toJson(sourceConfig),
+                null,
+                null);
+    }
+
+    @Test
+    public void testRegisterExistedSource() throws IOException {
+        Configurator.setRootLevel(Level.DEBUG);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        Response response = registerDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterSourceUploadFailure() throws Exception {
+        mockStatic(Utils.class);
+        doThrow(new IOException("upload failure")).when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+        Response response = registerDefaultSource();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterSourceSuccess() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(true)
+            .setMessage("source registered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = registerDefaultSource();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
+    @Test
+    public void testRegisterSourceFailure() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(false)
+            .setMessage("source failed to register");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = registerDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterSourceInterrupted() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+            new IOException("Function registeration interrupted"));
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = registerDefaultSource();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    //
+    // Update Functions
+    //
+
+    @Test
+    public void testUpdateSourceMissingTenant() throws IOException {
+        testUpdateSourceMissingArguments(
+            null,
+            namespace,
+                source,
+            mockedInputStream,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Tenant is not provided");
+    }
+
+    @Test
+    public void testUpdateSourceMissingNamespace() throws IOException {
+        testUpdateSourceMissingArguments(
+            tenant,
+            null,
+                source,
+            mockedInputStream,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Namespace is not provided");
+    }
+
+    @Test
+    public void testUpdateSourceMissingFunctionName() throws IOException {
+        testUpdateSourceMissingArguments(
+            tenant,
+            namespace,
+            null,
+            mockedInputStream,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Function Name is not provided");
+    }
+
+    @Test
+    public void testUpdateSourceMissingPackage() throws IOException {
+        testUpdateSourceMissingArguments(
+            tenant,
+            namespace,
+                source,
+            null,
+            mockedFormData,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Function Package is not provided");
+    }
+
+    @Test
+    public void testUpdateSourceMissingPackageDetails() throws IOException {
+        testUpdateSourceMissingArguments(
+            tenant,
+            namespace,
+                source,
+            mockedInputStream,
+            null,
+            outputTopic,
+                outputSerdeClassName,
+            className,
+            parallelism,
+                "Function Package is not provided");
+    }
+
+    private void testUpdateSourceMissingArguments(
+            String tenant,
+            String namespace,
+            String function,
+            InputStream inputStream,
+            FormDataContentDisposition details,
+            String outputTopic,
+            String outputSerdeClassName,
+            String className,
+            Integer parallelism,
+            String expectedError) throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+        SourceConfig sourceConfig = new SourceConfig();
+        if (tenant != null) {
+            sourceConfig.setTenant(tenant);
+        }
+        if (namespace != null) {
+            sourceConfig.setNamespace(namespace);
+        }
+        if (function != null) {
+            sourceConfig.setName(function);
+        }
+        if (outputTopic != null) {
+            sourceConfig.setTopicName(outputTopic);
+        }
+        if (outputSerdeClassName != null) {
+            sourceConfig.setSerdeClassName(outputSerdeClassName);
+        }
+        if (className != null) {
+            sourceConfig.setClassName(className);
+        }
+        if (parallelism != null) {
+            sourceConfig.setParallelism(parallelism);
+        }
+
+        Response response = resource.updateFunction(
+            tenant,
+            namespace,
+            function,
+            inputStream,
+            details,
+            null,
+            null,
+            null,
+            new Gson().toJson(sourceConfig),
+                null,
+                null);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason);
+    }
+
+    private Response updateDefaultSource() throws IOException {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(source);
+        sourceConfig.setClassName(className);
+        sourceConfig.setParallelism(parallelism);
+        sourceConfig.setTopicName(outputTopic);
+        sourceConfig.setSerdeClassName(outputSerdeClassName);
+
+        return resource.updateFunction(
+            tenant,
+            namespace,
+                source,
+            mockedInputStream,
+            mockedFormData,
+            null,
+            null,
+            null,
+            new Gson().toJson(sourceConfig),
+                null,
+                null);
+    }
+
+    @Test
+    public void testUpdateNotExistedSource() throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+        Response response = updateDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testUpdateSourceUploadFailure() throws Exception {
+        mockStatic(Utils.class);
+        doThrow(new IOException("upload failure")).when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        Response response = updateDefaultSource();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testUpdateSourceSuccess() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(true)
+            .setMessage("source registered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = updateDefaultSource();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
+    @Test
+    public void testUpdateSourceWithUrl() throws IOException {
+        Configurator.setRootLevel(Level.DEBUG);
+
+        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String filePackageUrl = "file://" + fileLocation;
+
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTopicName(outputTopic);
+        sourceConfig.setSerdeClassName(outputSerdeClassName);
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(source);
+        sourceConfig.setClassName(className);
+        sourceConfig.setParallelism(parallelism);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+        RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("source registered");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = resource.updateFunction(
+            tenant,
+            namespace,
+                source,
+            null,
+            null,
+            filePackageUrl,
+            null,
+            null,
+            new Gson().toJson(sourceConfig),
+                null,
+                null);
+
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
+    @Test
+    public void testUpdateSourceFailure() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(false)
+            .setMessage("source failed to register");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = updateDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testUpdateSourceInterrupted() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadToBookeeper(
+            any(Namespace.class),
+            any(InputStream.class),
+            anyString());
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+            new IOException("Function registeration interrupted"));
+        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        Response response = updateDefaultSource();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    //
+    // deregister source
+    //
+
+    @Test
+    public void testDeregisterSourceMissingTenant() throws Exception {
+        testDeregisterSourceMissingArguments(
+            null,
+            namespace,
+                source,
+            "Tenant");
+    }
+
+    @Test
+    public void testDeregisterSourceMissingNamespace() throws Exception {
+        testDeregisterSourceMissingArguments(
+            tenant,
+            null,
+                source,
+            "Namespace");
+    }
+
+    @Test
+    public void testDeregisterSourceMissingFunctionName() throws Exception {
+        testDeregisterSourceMissingArguments(
+            tenant,
+            namespace,
+            null,
+            "Function Name");
+    }
+
+    private void testDeregisterSourceMissingArguments(
+        String tenant,
+        String namespace,
+        String function,
+        String missingFieldName
+    ) {
+        Response response = resource.deregisterFunction(
+            tenant,
+            namespace,
+            function,
+            null);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    private Response deregisterDefaultSource() {
+        return resource.deregisterFunction(
+            tenant,
+            namespace,
+                source,
+            null);
+    }
+
+    @Test
+    public void testDeregisterNotExistedSource() {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+        Response response = deregisterDefaultSource();
+        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testDeregisterSourceSuccess() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(true)
+            .setMessage("source deregistered");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+
+        Response response = deregisterDefaultSource();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(rr.toJson(), response.getEntity());
+    }
+
+    @Test
+    public void testDeregisterSourceFailure() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        RequestResult rr = new RequestResult()
+            .setSuccess(false)
+            .setMessage("source failed to deregister");
+        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+
+        Response response = deregisterDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testDeregisterSourceInterrupted() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+            new IOException("Function deregisteration interrupted"));
+        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+
+        Response response = deregisterDefaultSource();
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    // Source Info doesn't exist. Maybe one day they might be added
+    //
+    // Get Function Info
+    //
+
+    /*
+    @Test
+    public void testGetFunctionMissingTenant() throws Exception {
+        testGetFunctionMissingArguments(
+            null,
+            namespace,
+                source,
+            "Tenant");
+    }
+
+    @Test
+    public void testGetFunctionMissingNamespace() throws Exception {
+        testGetFunctionMissingArguments(
+            tenant,
+            null,
+                source,
+            "Namespace");
+    }
+
+    @Test
+    public void testGetFunctionMissingFunctionName() throws Exception {
+        testGetFunctionMissingArguments(
+            tenant,
+            namespace,
+            null,
+            "Function Name");
+    }
+
+    private void testGetFunctionMissingArguments(
+        String tenant,
+        String namespace,
+        String function,
+        String missingFieldName
+    ) throws IOException {
+        Response response = resource.getFunctionInfo(
+            tenant,
+            namespace,
+            function);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    private Response getDefaultFunctionInfo() throws IOException {
+        return resource.getFunctionInfo(
+            tenant,
+            namespace,
+                source);
+    }
+
+    @Test
+    public void testGetNotExistedFunction() throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+        Response response = getDefaultFunctionInfo();
+        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testGetFunctionSuccess() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+        SinkSpec sinkSpec = SinkSpec.newBuilder()
+                .setTopic(outputTopic)
+                .setSerDeClassName(outputSerdeClassName).build();
+        FunctionDetails functionDetails = FunctionDetails.newBuilder()
+                .setClassName(className)
+                .setSink(sinkSpec)
+                .setName(source)
+                .setNamespace(namespace)
+                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+                .setTenant(tenant)
+                .setParallelism(parallelism)
+                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
+                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+        FunctionMetaData metaData = FunctionMetaData.newBuilder()
+            .setCreateTime(System.currentTimeMillis())
+            .setFunctionDetails(functionDetails)
+            .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+            .setVersion(1234)
+            .build();
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
+
+        Response response = getDefaultFunctionInfo();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            response.getEntity());
+    }
+
+    //
+    // List Functions
+    //
+
+    @Test
+    public void testListFunctionsMissingTenant() throws Exception {
+        testListFunctionsMissingArguments(
+            null,
+            namespace,
+            "Tenant");
+    }
+
+    @Test
+    public void testListFunctionsMissingNamespace() throws Exception {
+        testListFunctionsMissingArguments(
+            tenant,
+            null,
+            "Namespace");
+    }
+
+    private void testListFunctionsMissingArguments(
+        String tenant,
+        String namespace,
+        String missingFieldName
+    ) {
+        Response response = resource.listFunctions(
+            tenant,
+            namespace);
+
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
+    }
+
+    private Response listDefaultFunctions() {
+        return resource.listFunctions(
+            tenant,
+            namespace);
+    }
+
+    @Test
+    public void testListFunctionsSuccess() throws Exception {
+        List<String> functions = Lists.newArrayList("test-1", "test-2");
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+
+        Response response = listDefaultFunctions();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(new Gson().toJson(functions), response.getEntity());
+    }
+    */
+}
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 3dca855181..4716e3cfb5 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -225,23 +225,47 @@ The Apache Software License, Version 2.0
     - jackson-core-asl-1.9.13.jar
     - jackson-databind-2.8.4.jar
     - jackson-mapper-asl-1.9.13.jar
+    - jackson-dataformat-yaml-2.8.4.jar
  * Guava
     - guava-21.0.jar
     - guava-24.1-jre.jar
  * Google Guice
     - guice-4.2.0.jar
     - guice-multibindings-4.2.0.jar
+ * Google Gson
+    - gson-2.8.2.jar
+ * Google Common Protos
+    - proto-google-common-protos-1.0.0.jar
  * Apache Commons
     - commons-math3-3.6.1.jar
     - commons-beanutils-core-1.8.3.jar
-    - commons-beanutils-core-1.8.0.jar
+    - commons-beanutils-1.7.0.jar
     - commons-compress-1.15.jar
     - commons-lang3-3.3.2.jar
     - commons-lang3-3.4.jar
+    - commons-collections-3.2.1.jar
+    - commons-configuration-1.6.jar
+    - commons-digester-1.8.jar
+    - commons-lang-2.4.jar
+    - commons-logging-1.1.1.jar
  * Netty
     - netty-3.6.2.Final.jar
+    - netty-all-4.1.22.Final.jar
+    - netty-buffer-4.1.22.Final.jar
+    - netty-codec-4.1.22.Final.jar
+    - netty-codec-http-4.1.22.Final.jar
+    - netty-codec-http2-4.1.22.Final.jar
+    - netty-codec-socks-4.1.22.Final.jar
+    - netty-common-4.1.22.Final.jar
+    - netty-handler-4.1.22.Final.jar
+    - netty-handler-proxy-4.1.22.Final.jar
+    - netty-resolver-4.1.22.Final.jar
+    - netty-tcnative-boringssl-static-2.0.7.Final.jar
+    - netty-transport-4.1.22.Final.jar
  * Joda Time
-     - joda-time-2.9.9.jar
+    - joda-time-2.9.9.jar
+ * TypeTools
+    - typetools-0.5.0.jar
  * Jetty
     - http2-client-9.4.11.v20180605.jar
     - http2-common-9.4.11.v20180605.jar
@@ -257,6 +281,7 @@ The Apache Software License, Version 2.0
     - jetty-server-9.4.11.v20180605.jar
     - jetty-servlet-9.4.11.v20180605.jar
     - jetty-util-9.4.11.v20180605.jar
+    - jetty-util-9.3.11.v20160721.jar
   * Javassist
     - javassist-3.22.0-CR2.jar
   * Asynchronous Http Client
@@ -300,6 +325,7 @@ The Apache Software License, Version 2.0
     - units-1.0.jar
   * Error Prone Annotations
     - error_prone_annotations-2.1.3.jar
+    - error_prone_annotations-2.1.2.jar
   * Esri Geometry API For Java
     - esri-geometry-api-2.1.0.jar
   * Fastutil
@@ -332,6 +358,7 @@ The Apache Software License, Version 2.0
   * OkHttp
     - okhttp-3.9.0.jar
     - okhttp-urlconnection-3.9.0.jar
+    - okhttp-2.5.0.jar
   * OpenCSV
     - opencsv-2.3.jar
   * Plexus
@@ -357,6 +384,7 @@ The Apache Software License, Version 2.0
     - jcommander-1.48.jar
   * FindBugs JSR305
     - jsr305-3.0.2.jar
+    - jsr305-3.0.0.jar
   * Objenesis
     - objenesis-2.1.jar
     - objenesis-2.6.jar
@@ -393,6 +421,22 @@ The Apache Software License, Version 2.0
     - simpleclient_servlet-0.0.23.jar
   * LZ4
     - lz4-java-1.5.0.jar
+  * Bookkeeper
+    - circe-checksum-4.7.2.jar
+  * GRPC
+    - grpc-all-1.12.0.jar
+    - grpc-auth-1.12.0.jar
+    - grpc-context-1.12.0.jar
+    - grpc-core-1.12.0.jar
+    - grpc-netty-1.12.0.jar
+    - grpc-okhttp-1.12.0.jar
+    - grpc-protobuf-1.12.0.jar
+    - grpc-protobuf-lite-1.12.0.jar
+    - grpc-protobuf-nano-1.12.0.jar
+    - grpc-stub-1.12.0.jar
+  * OpenCensus
+    - opencensus-api-0.11.0.jar
+    - opencensus-contrib-grpc-metrics-0.11.0.jar
 
 Protocol Buffers License
  * Protocol Buffers
@@ -401,6 +445,11 @@ Protocol Buffers License
 
 BSD 3-clause "New" or "Revised" License
   *  RE2J TD -- re2j-td-1.4.jar
+  * google-auth-library-credentials-0.9.0.jar
+  * protobuf-java-util-3.5.1.jar
+
+BSD 2-clause
+  * protobuf-javanano-3.0.0-alpha-5.jar
 
 BSD License
  * ANTLR 4 Runtime -- antlr4-runtime-4.6.jar
diff --git a/src/check-binary-license b/src/check-binary-license
index 9d3e9b31c7..c5d609dd16 100755
--- a/src/check-binary-license
+++ b/src/check-binary-license
@@ -84,7 +84,7 @@ for J in $NOTICEJARS; do
 done
 
 # check pulsar sql jars
-JARS=$(tar -tf $TARBALL | grep '\.jar' | grep 'lib/presto/' | grep -v 'managed-ledger' | grep -v  'pulsar-client-admin' | grep -v 'pulsar-client-schema' | grep -v 'pulsar-functions-api' | grep -v 'pulsar-presto-connector-original' | grep -v 'pulsar-presto-distribution' | sed 's!.*/!!' | sort)
+JARS=$(tar -tf $TARBALL | grep '\.jar' | grep 'lib/presto/' | grep -v 'managed-ledger' | grep -v  'pulsar-client-admin' | grep -v 'pulsar-client-schema' | grep -v 'pulsar-functions-api' | grep -v 'pulsar-presto-connector-original' | grep -v 'pulsar-presto-distribution' | grep -v 'pulsar-common' | grep -v 'pulsar-functions-proto' | grep -v 'pulsar-functions-utils' | grep -v 'pulsar-io-core' | sed 's!.*/!!' | sort)
 LICENSEPATH=$(tar -tf $TARBALL  | awk '/^[^\/]*\/lib\/presto\/LICENSE/')
 LICENSE=$(tar -O -xf $TARBALL "$LICENSEPATH")
 LICENSEJARS=$(echo "$LICENSE" | sed -nE 's!.* (.*\.jar).*!\1!gp')
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index c51b25f45a..bf27859d53 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -66,7 +66,7 @@ public void teardownPresto() {
         pulsarCluster.stopPrestoWorker();;
     }
 
-    @Test
+    @Test(enabled = false)
     public void testSimpleSQLQuery() throws Exception {
 
         @Cleanup


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services