You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/06/24 03:01:43 UTC

[pulsar] branch master updated: Refactoring Function Component implementation (#4541)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95df092  Refactoring Function Component implementation (#4541)
95df092 is described below

commit 95df092832a3633a81fd719a5c096919141cc4f9
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Jun 23 20:01:37 2019 -0700

    Refactoring Function Component implementation (#4541)
    
    * Refactoring Function Component implementation
    
    * cleaning up
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  15 +-
 .../apache/pulsar/broker/admin/impl/SinksBase.java |  21 +-
 .../pulsar/broker/admin/impl/SourcesBase.java      |  20 +-
 .../functions/worker/rest/api/ComponentImpl.java   | 485 +--------------------
 .../functions/worker/rest/api/FunctionsImpl.java   | 394 ++++++++++++++++-
 .../functions/worker/rest/api/FunctionsImplV2.java |  19 +-
 .../functions/worker/rest/api/SinksImpl.java       | 403 ++++++++++++++++-
 .../functions/worker/rest/api/SourcesImpl.java     | 401 ++++++++++++++++-
 .../worker/rest/api/v3/FunctionsApiV3Resource.java |   8 +-
 .../worker/rest/api/v3/SinksApiV3Resource.java     |  21 +-
 .../worker/rest/api/v3/SourcesApiV3Resource.java   |  21 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  14 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |  28 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |  24 +-
 14 files changed, 1290 insertions(+), 584 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 11e20de..c6f38d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -18,7 +18,12 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import io.swagger.annotations.*;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Example;
+import io.swagger.annotations.ExampleProperty;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -163,10 +168,10 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                             )
                     )
             )
-            final @FormDataParam("functionConfig") String functionConfigJson) {
+            final @FormDataParam("functionConfig") FunctionConfig functionConfig) {
 
         functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-            functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData());
+            functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
     }
 
     @PUT
@@ -270,12 +275,12 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                             )
                     )
             )
-            final @FormDataParam("functionConfig") String functionConfigJson,
+            final @FormDataParam("functionConfig") FunctionConfig functionConfig,
             @ApiParam(value = "The update options is for the Pulsar Function that needs to be updated.")
             final @FormDataParam("updateOptions") UpdateOptions updateOptions) throws IOException {
 
         functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData(), updateOptions);
+                functionPkgUrl, functionConfig, clientAppId(), clientAuthData(), updateOptions);
     }
 
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index c527bd3..e21d861 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -77,7 +77,7 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
                              final @PathParam("sinkName") String sinkName,
                              final @FormDataParam("data") InputStream uploadedInputStream,
                              final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                             final @FormDataParam("url") String functionPkgUrl,
+                             final @FormDataParam("url") String sinkPkgUrl,
                              @ApiParam(
                                  value =
                                      "A JSON value presenting a sink config playload. All available configuration options are:  \n" +
@@ -136,10 +136,9 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
                                      )
                                  )
                              )
-                             final @FormDataParam("sinkConfig") String sinkConfigJson) {
-
-        sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData());
+                             final @FormDataParam("sinkConfig") SinkConfig sinkConfig) {
+        sink.registerSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+                sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData());
     }
 
     @PUT
@@ -162,7 +161,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
                            final @PathParam("sinkName") String sinkName,
                            final @FormDataParam("data") InputStream uploadedInputStream,
                            final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                           final @FormDataParam("url") String functionPkgUrl,
+                           @ApiParam(value = "URL of sink's archive")
+                           final @FormDataParam("url") String sinkPkgUrl,
                            @ApiParam(
                                value =
                                    "A JSON value presenting a sink config playload. All available configuration options are:  \n" +
@@ -221,12 +221,11 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
                                    )
                                )
                            )
-                           final @FormDataParam("sinkConfig") String sinkConfigJson,
-                           @ApiParam()
+                           final @FormDataParam("sinkConfig") SinkConfig sinkConfig,
+                           @ApiParam(value = "Update options for sink")
                            final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
-
-         sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData(), updateOptions);
+         sink.updateSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+                sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData(), updateOptions);
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 4856800..8dc20c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -79,7 +79,7 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
             final @PathParam("sourceName") String sourceName,
             final @FormDataParam("data") InputStream uploadedInputStream,
             final @FormDataParam("data") FormDataContentDisposition fileDetail,
-            final @FormDataParam("url") String functionPkgUrl,
+            final @FormDataParam("url") String sourcePkgUrl,
             @ApiParam(
                     value = "A JSON value presenting source configuration payload. An example of the expected functions can be found here.  \n" +
                             "classname  \n" +
@@ -126,10 +126,9 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
                             )
                     )
             )
-            final @FormDataParam("sourceConfig") String sourceConfigJson) {
-
-        source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-            functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
+            final @FormDataParam("sourceConfig") SourceConfig sourceConfig) {
+        source.registerSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+            sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData());
     }
 
     @PUT
@@ -154,7 +153,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
             final @PathParam("sourceName") String sourceName,
             final @FormDataParam("data") InputStream uploadedInputStream,
             final @FormDataParam("data") FormDataContentDisposition fileDetail,
-            final @FormDataParam("url") String functionPkgUrl,
+            @ApiParam(value = "URL of sources' archive")
+            final @FormDataParam("url") String sourcePkgUrl,
             @ApiParam(
                     value = "A JSON value presenting source configuration payload. An example of the expected functions can be found here.  \n" +
                             "classname  \n" +
@@ -201,11 +201,11 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
                             )
                     )
             )
-            final @FormDataParam("sourceConfig") String sourceConfigJson,
+            final @FormDataParam("sourceConfig") SourceConfig sourceConfig,
+            @ApiParam(value = "Update options for source")
             final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
-
-        source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-            functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions);
+        source.updateSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+            sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData(), updateOptions);
     }
 
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 62ac9bd..345ea56 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.gson.Gson;
-import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -34,7 +31,6 @@ import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -46,19 +42,14 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
-import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -68,11 +59,9 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.utils.SinkConfigUtils;
-import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -95,12 +84,10 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -111,10 +98,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
 import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
@@ -285,170 +270,10 @@ public abstract class ComponentImpl {
         return true;
     }
 
-    public void registerFunction(final String tenant,
-                                 final String namespace,
-                                 final String componentName,
-                                 final InputStream uploadedInputStream,
-                                 final FormDataContentDisposition fileDetail,
-                                 final String functionPkgUrl,
-                                 final String componentConfigJson,
-                                 final String clientRole,
-                                 AuthenticationDataHttps clientAuthenticationDataHttps) {
-
-        if (!isWorkerServiceAvailable()) {
-            throwUnavailableException();
-        }
-
-        if (tenant == null) {
-            throw new RestException(Status.BAD_REQUEST, "Tenant is not provided");
-        }
-        if (namespace == null) {
-            throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
-        }
-        if (componentName == null) {
-            throw new RestException(Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
-        }
-
-        try {
-            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
-                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
-                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
-            }
-        } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
-
-        try {
-            // Check tenant exists
-            final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-
-            String qualifiedNamespace = tenant + "/" + namespace;
-            List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
-            if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
-                String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
-                        worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
-                if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
-                    log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, componentName, namespace);
-                    throw new RestException(Status.BAD_REQUEST, "Namespace does not exist");
-                }
-            }
-        } catch (PulsarAdminException.NotAuthorizedException e) {
-            log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
-                    componentName, clientRole, ComponentTypeUtils.toString(componentType));
-            throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
-        } catch (PulsarAdminException.NotFoundException e) {
-            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, componentName, tenant);
-            throw new RestException(Status.BAD_REQUEST, "Tenant does not exist");
-        } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), componentName));
-        }
-
-        FunctionDetails functionDetails = null;
-        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
-        File componentPackageFile = null;
-        try {
-
-            // validate parameters
-            try {
-                if (isPkgUrlProvided) {
-
-                    if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
-                        throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
-                    }
-                    try {
-                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
-                    } catch (Exception e) {
-                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
-                            componentConfigJson, componentType, componentPackageFile);
-                } else {
-                    if (uploadedInputStream != null) {
-                        componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
-                            componentConfigJson, componentType, componentPackageFile);
-                    if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
-                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-
-            try {
-                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
-            } catch (Exception e) {
-                log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
-                throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
-            }
-
-            // function state
-            FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
-                    .setFunctionDetails(functionDetails)
-                    .setCreateTime(System.currentTimeMillis())
-                    .setVersion(0);
-
-            // cache auth if need
-            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
-
-                if (clientAuthenticationDataHttps != null) {
-                    try {
-                        Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
-                                .getRuntimeFactory()
-                                .getAuthProvider()
-                                .cacheAuthData(tenant, namespace, componentName, clientAuthenticationDataHttps);
-
-                        if (functionAuthData.isPresent()) {
-                            functionMetaDataBuilder.setFunctionAuthSpec(
-                                    Function.FunctionAuthenticationSpec.newBuilder()
-                                            .setData(ByteString.copyFrom(functionAuthData.get().getData()))
-                                            .build());
-                        }
-                    } catch (Exception e) {
-                        log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-
-
-                        throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
-                    }
-                }
-            }
-
-            PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
-            try {
-                packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
-                        functionPkgUrl, fileDetail, componentPackageFile);
-            } catch (Exception e) {
-                log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-            }
-
-            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-            updateRequest(functionMetaDataBuilder.build());
-        } finally {
-
-            if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
-                    && componentPackageFile != null && componentPackageFile.exists()) {
-                componentPackageFile.delete();
-            }
-        }
-    }
-
-    private PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
-                                                                       final String functionPkgUrl,
-                                                                       final FormDataContentDisposition fileDetail,
-                                                                       final File uploadedInputStreamAsFile) throws Exception {
+    PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
+                                                               final String functionPkgUrl,
+                                                               final FormDataContentDisposition fileDetail,
+                                                               final File uploadedInputStreamAsFile) throws Exception {
         FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
         String tenant = functionDetails.getTenant();
         String namespace = functionDetails.getNamespace();
@@ -512,230 +337,6 @@ public abstract class ComponentImpl {
         return packageLocationMetaDataBuilder;
     }
 
-
-    public void updateFunction(final String tenant,
-                               final String namespace,
-                               final String componentName,
-                               final InputStream uploadedInputStream,
-                               final FormDataContentDisposition fileDetail,
-                               final String functionPkgUrl,
-                               final String componentConfigJson,
-                               final String clientRole,
-                               AuthenticationDataHttps clientAuthenticationDataHttps,
-                               UpdateOptions updateOptions) {
-
-        if (!isWorkerServiceAvailable()) {
-            throwUnavailableException();
-        }
-
-        if (tenant == null) {
-            throw new RestException(Status.BAD_REQUEST, "Tenant is not provided");
-        }
-        if (namespace == null) {
-            throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
-        }
-        if (componentName == null) {
-            throw new RestException(Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
-        }
-
-        try {
-            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
-                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
-                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
-
-            }
-        } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
-        }
-
-        String mergedComponentConfigJson;
-        String existingComponentConfigJson;
-
-        FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
-
-        if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
-        }
-
-        if (componentType.equals(FunctionDetails.ComponentType.FUNCTION)) {
-            FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
-            existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
-            FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precedence over whatever is there in functionconfig
-            functionConfig.setTenant(tenant);
-            functionConfig.setNamespace(namespace);
-            functionConfig.setName(componentName);
-            try {
-                FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(existingFunctionConfig,
-                        functionConfig);
-                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
-            } catch (Exception e) {
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-        } else if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
-            SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
-            existingComponentConfigJson = new Gson().toJson(existingSourceConfig);
-            SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precedence over whatever is there in functionconfig
-            sourceConfig.setTenant(tenant);
-            sourceConfig.setNamespace(namespace);
-            sourceConfig.setName(componentName);
-            try {
-                SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
-                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
-            } catch (Exception e) {
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-        } else {
-            SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
-            existingComponentConfigJson = new Gson().toJson(existingSinkConfig);
-            SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
-            // The rest end points take precedence over whatever is there in functionconfig
-            sinkConfig.setTenant(tenant);
-            sinkConfig.setNamespace(namespace);
-            sinkConfig.setName(componentName);
-            try {
-                SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
-                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
-            } catch (Exception e) {
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-        }
-
-        if (existingComponentConfigJson.equals(mergedComponentConfigJson) && isBlank(functionPkgUrl) && uploadedInputStream == null) {
-            log.error("{}/{}/{} Update contains no changes", tenant, namespace, componentName);
-            throw new RestException(Status.BAD_REQUEST, "Update contains no change");
-        }
-
-        FunctionDetails functionDetails = null;
-        File componentPackageFile = null;
-        try {
-
-            // validate parameters
-            try {
-                if (isNotBlank(functionPkgUrl)) {
-                    try {
-                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
-                    } catch (Exception e) {
-                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
-                            mergedComponentConfigJson, componentType, componentPackageFile);
-
-                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
-                        || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
-                    try {
-                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
-                    } catch (Exception e) {
-                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
-                            mergedComponentConfigJson, componentType, componentPackageFile);
-                } else if (uploadedInputStream != null) {
-
-                    componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
-                            mergedComponentConfigJson, componentType, componentPackageFile);
-
-                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
-                            mergedComponentConfigJson, componentType, componentPackageFile);
-                    if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
-                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
-                    }
-                } else {
-
-                    componentPackageFile = FunctionCommon.createPkgTempFile();
-                    componentPackageFile.deleteOnExit();
-                    log.info("componentPackageFile: {}", componentPackageFile);
-                    WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
-
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
-                            mergedComponentConfigJson, componentType, componentPackageFile);
-                }
-            } catch (Exception e) {
-                log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-
-            try {
-                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
-            } catch (Exception e) {
-                log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
-                throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
-                        ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
-            }
-
-            // merge from existing metadata
-            FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder().mergeFrom(existingComponent)
-                    .setFunctionDetails(functionDetails);
-
-            // update auth data if need
-            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
-                if (clientAuthenticationDataHttps != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
-                    // get existing auth data if it exists
-                    Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
-                    if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
-                        existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
-                    }
-
-                    try {
-                        Optional<FunctionAuthData> newFunctionAuthData = worker().getFunctionRuntimeManager()
-                                .getRuntimeFactory()
-                                .getAuthProvider()
-                                .updateAuthData(
-                                        tenant, namespace,
-                                        componentName, existingFunctionAuthData,
-                                        clientAuthenticationDataHttps);
-
-                        if (newFunctionAuthData.isPresent()) {
-                            functionMetaDataBuilder.setFunctionAuthSpec(
-                                    Function.FunctionAuthenticationSpec.newBuilder()
-                                            .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
-                                            .build());
-                        } else {
-                            functionMetaDataBuilder.clearFunctionAuthSpec();
-                        }
-                    } catch (Exception e) {
-                        log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-                        throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
-                    }
-                }
-            }
-
-            PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
-            if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
-                try {
-                    packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
-                            functionPkgUrl, fileDetail, componentPackageFile);
-                } catch (Exception e) {
-                    log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-                }
-            } else {
-                packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
-            }
-
-            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-
-            updateRequest(functionMetaDataBuilder.build());
-        } finally {
-            if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
-                    && componentPackageFile != null && componentPackageFile.exists()) {
-                componentPackageFile.delete();
-            }
-        }
-    }
-
     public void deregisterFunction(final String tenant,
                                    final String namespace,
                                    final String componentName,
@@ -1269,7 +870,7 @@ public abstract class ComponentImpl {
         return retVals;
     }
 
-    private void updateRequest(final FunctionMetaData functionMetaData) {
+    void updateRequest(final FunctionMetaData functionMetaData) {
 
         // Submit to FMT
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
@@ -1708,80 +1309,6 @@ public abstract class ComponentImpl {
         return null;
     }
 
-private FunctionDetails validateUpdateRequestParams(final String tenant,
-                                                    final String namespace,
-                                                    final String componentName,
-                                                    final String componentConfigJson,
-                                                    final FunctionDetails.ComponentType componentType,
-                                                    final File componentPackageFile) throws IOException {
-        if (tenant == null) {
-            throw new IllegalArgumentException("Tenant is not provided");
-        }
-        if (namespace == null) {
-            throw new IllegalArgumentException("Namespace is not provided");
-        }
-        if (componentName == null) {
-            throw new IllegalArgumentException(String.format("%s Name is not provided", ComponentTypeUtils.toString(componentType)));
-        }
-
-        if (componentType.equals(FunctionDetails.ComponentType.FUNCTION) && !isEmpty(componentConfigJson)) {
-            FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precedence over whatever is there in functionconfig
-            functionConfig.setTenant(tenant);
-            functionConfig.setNamespace(namespace);
-            functionConfig.setName(componentName);
-            FunctionConfigUtils.inferMissingArguments(functionConfig);
-            ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
-            return FunctionConfigUtils.convert(functionConfig, clsLoader);
-        }
-        if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
-            Path archivePath = null;
-            SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precedence over whatever is there in sourceconfig
-            sourceConfig.setTenant(tenant);
-            sourceConfig.setNamespace(namespace);
-            sourceConfig.setName(componentName);
-            org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
-            if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
-                String builtinArchive = sourceConfig.getArchive();
-                if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
-                    builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
-                }
-                try {
-                    archivePath = this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
-                }
-            }
-            SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, componentPackageFile);
-            return SourceConfigUtils.convert(sourceConfig, sourceDetails);
-        }
-        if (componentType.equals(FunctionDetails.ComponentType.SINK)) {
-            Path archivePath = null;
-            SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
-            // The rest end points take precedence over whatever is there in sinkConfig
-            sinkConfig.setTenant(tenant);
-            sinkConfig.setNamespace(namespace);
-            sinkConfig.setName(componentName);
-            org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
-            if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
-                String builtinArchive = sinkConfig.getArchive();
-                if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
-                    builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
-                }
-                try {
-                    archivePath = this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
-                }
-            }
-            SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
-            return SinkConfigUtils.convert(sinkConfig, sinkDetails);
-        } else {
-            throw new IllegalArgumentException("Unrecognized component type: " + ComponentTypeUtils.toString(componentType));
-        }
-    }
-
     private void validateTriggerRequestParams(final String tenant,
                                               final String namespace,
                                               final String functionName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index bf09fbf..da86189 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -18,27 +18,401 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
+import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
 @Slf4j
 public class FunctionsImpl extends ComponentImpl {
 
+    public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
+        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.FUNCTION);
+    }
+
+    public void registerFunction(final String tenant,
+                                 final String namespace,
+                                 final String functionName,
+                                 final InputStream uploadedInputStream,
+                                 final FormDataContentDisposition fileDetail,
+                                 final String functionPkgUrl,
+                                 final FunctionConfig functionConfig,
+                                 final String clientRole,
+                                 AuthenticationDataHttps clientAuthenticationDataHttps) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+        }
+        if (functionName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+                        functionName, clientRole, ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        try {
+            // Check tenant exists
+            worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+            if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
+                String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
+                        worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+                if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+                    log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, functionName, namespace);
+                    throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
+                }
+            }
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
+                    functionName, clientRole, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+        } catch (PulsarAdminException.NotFoundException e) {
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, functionName, tenant);
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, functionName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+        if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+            log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName);
+            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), functionName));
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isPkgUrlProvided) {
+
+                    if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
+                        throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
+                    }
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+                            functionConfig, componentPackageFile);
+                } else {
+                    if (uploadedInputStream != null) {
+                        componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+                            functionConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+            }
+
+            try {
+                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName);
+                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+            }
+
+            // function state
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder()
+                    .setFunctionDetails(functionDetails)
+                    .setCreateTime(System.currentTimeMillis())
+                    .setVersion(0);
+
+            // cache auth if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+                if (clientAuthenticationDataHttps != null) {
+                    try {
+                        Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .cacheAuthData(tenant, namespace, functionName, clientAuthenticationDataHttps);
+
+                        if (functionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    Function.FunctionAuthenticationSpec.newBuilder()
+                                            .setData(ByteString.copyFrom(functionAuthData.get().getData()))
+                                            .build());
+                        }
+                    } catch (Exception e) {
+                        log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+
+
+                        throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+            try {
+                packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                        functionPkgUrl, fileDetail, componentPackageFile);
+            } catch (Exception e) {
+                log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+            }
+
+            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+
+            if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
+    public void updateFunction(final String tenant,
+                               final String namespace,
+                               final String functionName,
+                               final InputStream uploadedInputStream,
+                               final FormDataContentDisposition fileDetail,
+                               final String functionPkgUrl,
+                               final FunctionConfig functionConfig,
+                               final String clientRole,
+                               AuthenticationDataHttps clientAuthenticationDataHttps,
+                               UpdateOptions updateOptions) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+        }
+        if (functionName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+                        functionName, clientRole, ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), functionName));
+        }
+
+        Function.FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
+
+        if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, functionName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), functionName));
+        }
+
+        FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+        // The rest end points take precedence over whatever is there in function config
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        FunctionConfig mergedConfig;
+        try {
+            mergedConfig = FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig);
+        } catch (Exception e) {
+            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+        }
+
+        if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null) {
+            log.error("{}/{}/{} Update contains no changes", tenant, namespace, functionName);
+            throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isNotBlank(functionPkgUrl)) {
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+                            mergedConfig, componentPackageFile);
+
+                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+                        || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+                            mergedConfig, componentPackageFile);
+                } else if (uploadedInputStream != null) {
+
+                    componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+                            mergedConfig, componentPackageFile);
+
+                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+                            mergedConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+                    }
+                } else {
+
+                    componentPackageFile = FunctionCommon.createPkgTempFile();
+                    componentPackageFile.deleteOnExit();
+                    log.info("componentPackageFile: {}", componentPackageFile);
+                    WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+                            mergedConfig, componentPackageFile);
+                }
+            } catch (Exception e) {
+                log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+            }
+
+            try {
+                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName);
+                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
+                        ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+            }
+
+            // merge from existing metadata
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+                    .setFunctionDetails(functionDetails);
+
+            // update auth data if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+                if (clientAuthenticationDataHttps != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
+                    // get existing auth data if it exists
+                    Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
+                    if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+                        existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+                    }
+
+                    try {
+                        Optional<FunctionAuthData> newFunctionAuthData = worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .updateAuthData(
+                                        tenant, namespace,
+                                        functionName, existingFunctionAuthData,
+                                        clientAuthenticationDataHttps);
+
+                        if (newFunctionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    Function.FunctionAuthenticationSpec.newBuilder()
+                                            .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+                                            .build());
+                        } else {
+                            functionMetaDataBuilder.clearFunctionAuthSpec();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                        throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+            if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
+                try {
+                    packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                            functionPkgUrl, fileDetail, componentPackageFile);
+                } catch (Exception e) {
+                    log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                    throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+                }
+            } else {
+                packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+            }
+
+            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+            if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
     private class GetFunctionStatus extends GetStatus<FunctionStatus, FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
 
         @Override
@@ -195,10 +569,6 @@ public class FunctionsImpl extends ComponentImpl {
         return exceptionInformation;
     }
 
-    public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.FUNCTION);
-    }
-
     /**
      * Get status of a function instance.  If this worker is not running the function instance,
      * @param tenant the tenant the function belongs to
@@ -262,4 +632,20 @@ public class FunctionsImpl extends ComponentImpl {
 
         return functionStatus;
     }
+
+    private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                                 final String namespace,
+                                                                 final String componentName,
+                                                                 final FunctionConfig functionConfig,
+                                                                 final File componentPackageFile) throws IOException {
+
+        // The rest end points take precedence over whatever is there in function config
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(componentName);
+        FunctionConfigUtils.inferMissingArguments(functionConfig);
+        ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+        return FunctionConfigUtils.convert(functionConfig, clsLoader);
+
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index b1da329..d4e6414 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -18,14 +18,12 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -103,14 +101,9 @@ public class FunctionsImplV2 {
             throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
         }
         FunctionConfig functionConfig = FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
-        String functionConfigJson = null;
-        try {
-            functionConfigJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig);
-        } catch (JsonProcessingException e) {
-            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
+
         delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientRole, null);
+                functionPkgUrl, functionConfig, clientRole, null);
         return Response.ok().build();
     }
 
@@ -125,15 +118,9 @@ public class FunctionsImplV2 {
             throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
         }
         FunctionConfig functionConfig = FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
-        String functionConfigJson = null;
-        try {
-            functionConfigJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig);
-        } catch (JsonProcessingException e) {
-            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
 
         delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientRole, null, null);
+                functionPkgUrl, functionConfig, clientRole, null, null);
         return Response.ok().build();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index fa07160..504722f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -18,34 +18,406 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
 import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
 public class SinksImpl extends ComponentImpl {
 
+    public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
+        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SINK);
+    }
+
+    public void registerSink(final String tenant,
+                             final String namespace,
+                             final String sinkName,
+                             final InputStream uploadedInputStream,
+                             final FormDataContentDisposition fileDetail,
+                             final String sinkPkgUrl,
+                             final SinkConfig sinkConfig,
+                             final String clientRole,
+                             AuthenticationDataHttps clientAuthenticationDataHttps) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+        }
+        if (sinkName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+                        sinkName, clientRole, ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, sinkName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        try {
+            // Check tenant exists
+            worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+            if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
+                String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
+                        worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+                if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+                    log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, sinkName, namespace);
+                    throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
+                }
+            }
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
+                    sinkName, clientRole, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+        } catch (PulsarAdminException.NotFoundException e) {
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, sinkName, tenant);
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, sinkName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+        if (functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
+            log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
+            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), sinkName));
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        boolean isPkgUrlProvided = isNotBlank(sinkPkgUrl);
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isPkgUrlProvided) {
+
+                    if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
+                        throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
+                    }
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+                            sinkConfig, componentPackageFile);
+                } else {
+                    if (uploadedInputStream != null) {
+                        componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+                            sinkConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+            }
+
+            try {
+                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
+                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+            }
+
+            // function state
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder()
+                    .setFunctionDetails(functionDetails)
+                    .setCreateTime(System.currentTimeMillis())
+                    .setVersion(0);
+
+            // cache auth if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+                if (clientAuthenticationDataHttps != null) {
+                    try {
+                        Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .cacheAuthData(tenant, namespace, sinkName, clientAuthenticationDataHttps);
+
+                        if (functionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    Function.FunctionAuthenticationSpec.newBuilder()
+                                            .setData(ByteString.copyFrom(functionAuthData.get().getData()))
+                                            .build());
+                        }
+                    } catch (Exception e) {
+                        log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+
+
+                        throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+            try {
+                packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                        sinkPkgUrl, fileDetail, componentPackageFile);
+            } catch (Exception e) {
+                log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+            }
+
+            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+
+            if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
+    public void updateSink(final String tenant,
+                           final String namespace,
+                           final String sinkName,
+                           final InputStream uploadedInputStream,
+                           final FormDataContentDisposition fileDetail,
+                           final String sinkPkgUrl,
+                           final SinkConfig sinkConfig,
+                           final String clientRole,
+                           AuthenticationDataHttps clientAuthenticationDataHttps,
+                           UpdateOptions updateOptions) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+        }
+        if (sinkName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+                        sinkName, clientRole, ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, sinkName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
+            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sinkName));
+        }
+
+        Function.FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, sinkName);
+
+        if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, sinkName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sinkName));
+        }
+
+
+        SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+        // The rest end points take precedence over whatever is there in functionconfig
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sinkName);
+
+        SinkConfig mergedConfig;
+        try {
+            mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
+        } catch (Exception e) {
+            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+        }
+
+
+        if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null) {
+            log.error("{}/{}/{} Update contains no changes", tenant, namespace, sinkName);
+            throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isNotBlank(sinkPkgUrl)) {
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+
+                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+                        || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+                } else if (uploadedInputStream != null) {
+
+                    componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+
+                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+                    }
+                } else {
+
+                    componentPackageFile = FunctionCommon.createPkgTempFile();
+                    componentPackageFile.deleteOnExit();
+                    log.info("componentPackageFile: {}", componentPackageFile);
+                    WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+                }
+            } catch (Exception e) {
+                log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+            }
+
+            try {
+                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
+                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
+                        ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+            }
+
+            // merge from existing metadata
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+                    .setFunctionDetails(functionDetails);
+
+            // update auth data if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+                if (clientAuthenticationDataHttps != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
+                    // get existing auth data if it exists
+                    Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
+                    if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+                        existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+                    }
+
+                    try {
+                        Optional<FunctionAuthData> newFunctionAuthData = worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .updateAuthData(
+                                        tenant, namespace,
+                                        sinkName, existingFunctionAuthData,
+                                        clientAuthenticationDataHttps);
+
+                        if (newFunctionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    Function.FunctionAuthenticationSpec.newBuilder()
+                                            .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+                                            .build());
+                        } else {
+                            functionMetaDataBuilder.clearFunctionAuthSpec();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                        throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+            if (isNotBlank(sinkPkgUrl) || uploadedInputStream != null) {
+                try {
+                    packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                            sinkPkgUrl, fileDetail, componentPackageFile);
+                } catch (Exception e) {
+                    log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                    throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+                }
+            } else {
+                packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+            }
+
+            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+            if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
     private class GetSinkStatus extends GetStatus<SinkStatus, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
 
         @Override
@@ -206,10 +578,6 @@ public class SinksImpl extends ComponentImpl {
         return exceptionInformation;
     }
 
-    public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SINK);
-    }
-
     public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
                                                                                       final String namespace,
                                                                                       final String sinkName,
@@ -287,4 +655,31 @@ public class SinksImpl extends ComponentImpl {
         SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
         return config;
     }
+
+    private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                                 final String namespace,
+                                                                 final String sinkName,
+                                                                 final SinkConfig sinkConfig,
+                                                                 final File componentPackageFile) throws IOException {
+
+        Path archivePath = null;
+        // The rest end points take precedence over whatever is there in sinkConfig
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sinkName);
+        org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+        if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
+            String builtinArchive = sinkConfig.getArchive();
+            if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+                builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+            }
+            try {
+                archivePath = this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
+            }
+        }
+        SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
+        return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index d35724e..15a26aa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -18,33 +18,403 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
 import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
 public class SourcesImpl extends ComponentImpl {
+
+    public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
+        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SOURCE);
+    }
+
+    public void registerSource(final String tenant,
+                               final String namespace,
+                               final String sourceName,
+                               final InputStream uploadedInputStream,
+                               final FormDataContentDisposition fileDetail,
+                               final String sourcePkgUrl,
+                               final SourceConfig sourceConfig,
+                               final String clientRole,
+                               AuthenticationDataHttps clientAuthenticationDataHttps) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+        }
+        if (sourceName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+                        sourceName, clientRole, ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, sourceName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        try {
+            // Check tenant exists
+            worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+            if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
+                String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
+                        worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+                if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+                    log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, sourceName, namespace);
+                    throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
+                }
+            }
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
+                    sourceName, clientRole, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+        } catch (PulsarAdminException.NotFoundException e) {
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, sourceName, tenant);
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, sourceName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+        if (functionMetaDataManager.containsFunction(tenant, namespace, sourceName)) {
+            log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
+            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), sourceName));
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        boolean isPkgUrlProvided = isNotBlank(sourcePkgUrl);
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isPkgUrlProvided) {
+
+                    if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
+                        throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
+                    }
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+                            sourceConfig, componentPackageFile);
+                } else {
+                    if (uploadedInputStream != null) {
+                        componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+                            sourceConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+            }
+
+            try {
+                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
+                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+            }
+
+            // function state
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder()
+                    .setFunctionDetails(functionDetails)
+                    .setCreateTime(System.currentTimeMillis())
+                    .setVersion(0);
+
+            // cache auth if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+                if (clientAuthenticationDataHttps != null) {
+                    try {
+                        Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .cacheAuthData(tenant, namespace, sourceName, clientAuthenticationDataHttps);
+
+                        if (functionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    Function.FunctionAuthenticationSpec.newBuilder()
+                                            .setData(ByteString.copyFrom(functionAuthData.get().getData()))
+                                            .build());
+                        }
+                    } catch (Exception e) {
+                        log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+
+
+                        throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+            try {
+                packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                        sourcePkgUrl, fileDetail, componentPackageFile);
+            } catch (Exception e) {
+                log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+            }
+
+            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+
+            if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
+    public void updateSource(final String tenant,
+                               final String namespace,
+                               final String sourceName,
+                               final InputStream uploadedInputStream,
+                               final FormDataContentDisposition fileDetail,
+                               final String sourcePkgUrl,
+                               final SourceConfig sourceConfig,
+                               final String clientRole,
+                               AuthenticationDataHttps clientAuthenticationDataHttps,
+                               UpdateOptions updateOptions) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+        }
+        if (sourceName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+                        sourceName, clientRole, ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, sourceName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, sourceName)) {
+            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sourceName));
+        }
+
+        Function.FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, sourceName);
+
+        if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, sourceName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sourceName));
+        }
+
+        SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+        // The rest end points take precedence over whatever is there in functionconfig
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(sourceName);
+        SourceConfig mergedConfig;
+        try {
+            mergedConfig = SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
+        } catch (Exception e) {
+            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+        }
+
+        if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) && uploadedInputStream == null) {
+            log.error("{}/{}/{} Update contains no changes", tenant, namespace, sourceName);
+            throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isNotBlank(sourcePkgUrl)) {
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+
+                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+                        || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
+                    try {
+                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+                } else if (uploadedInputStream != null) {
+
+                    componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+
+                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+                    }
+                } else {
+
+                    componentPackageFile = FunctionCommon.createPkgTempFile();
+                    componentPackageFile.deleteOnExit();
+                    log.info("componentPackageFile: {}", componentPackageFile);
+                    WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+                }
+            } catch (Exception e) {
+                log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+            }
+
+            try {
+                worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
+                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
+                        ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+            }
+
+            // merge from existing metadata
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+                    .setFunctionDetails(functionDetails);
+
+            // update auth data if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+                if (clientAuthenticationDataHttps != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
+                    // get existing auth data if it exists
+                    Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
+                    if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+                        existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+                    }
+
+                    try {
+                        Optional<FunctionAuthData> newFunctionAuthData = worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .updateAuthData(
+                                        tenant, namespace,
+                                        sourceName, existingFunctionAuthData,
+                                        clientAuthenticationDataHttps);
+
+                        if (newFunctionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    Function.FunctionAuthenticationSpec.newBuilder()
+                                            .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+                                            .build());
+                        } else {
+                            functionMetaDataBuilder.clearFunctionAuthSpec();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                        throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+            if (isNotBlank(sourcePkgUrl) || uploadedInputStream != null) {
+                try {
+                    packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                            sourcePkgUrl, fileDetail, componentPackageFile);
+                } catch (Exception e) {
+                    log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                    throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+                }
+            } else {
+                packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+            }
+
+            functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+            if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
     private class GetSourceStatus extends GetStatus<SourceStatus, SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
 
         @Override
@@ -208,10 +578,6 @@ public class SourcesImpl extends ComponentImpl {
         }
     }
 
-    public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SOURCE);
-    }
-
     public SourceStatus getSourceStatus(final String tenant,
                                         final String namespace,
                                         final String componentName,
@@ -285,4 +651,31 @@ public class SourcesImpl extends ComponentImpl {
         SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
         return config;
     }
+
+    private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
+                                                                 final String namespace,
+                                                                 final String sourceName,
+                                                                 final SourceConfig sourceConfig,
+                                                                 final File sourcePackageFile) throws IOException {
+
+        Path archivePath = null;
+        // The rest end points take precedence over whatever is there in sourceconfig
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(sourceName);
+        org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+        if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
+            String builtinArchive = sourceConfig.getArchive();
+            if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+                builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+            }
+            try {
+                archivePath = this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
+            }
+        }
+        SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, sourcePackageFile);
+        return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 2f1a87f..0bbacaa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -67,10 +67,10 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
                                  final @FormDataParam("data") InputStream uploadedInputStream,
                                  final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                  final @FormDataParam("url") String functionPkgUrl,
-                                 final @FormDataParam("functionConfig") String functionConfigJson) {
+                                 final @FormDataParam("functionConfig") FunctionConfig functionConfig) {
 
         functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData());
+                functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
 
     }
 
@@ -83,11 +83,11 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
                                final @FormDataParam("data") InputStream uploadedInputStream,
                                final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                final @FormDataParam("url") String functionPkgUrl,
-                               final @FormDataParam("functionConfig") String functionConfigJson,
+                               final @FormDataParam("functionConfig") FunctionConfig functionConfig,
                                final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
 
         functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData(), updateOptions);
+                functionPkgUrl, functionConfig, clientAppId(), clientAuthData(), updateOptions);
 
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index e699544..d9f788b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -33,7 +33,14 @@ import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.io.InputStream;
@@ -62,10 +69,10 @@ public class SinksApiV3Resource extends FunctionApiResource {
                              final @FormDataParam("data") InputStream uploadedInputStream,
                              final @FormDataParam("data") FormDataContentDisposition fileDetail,
                              final @FormDataParam("url") String functionPkgUrl,
-                             final @FormDataParam("sinkConfig") String sinkConfigJson) {
+                             final @FormDataParam("sinkConfig") SinkConfig sinkConfig) {
 
-        sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData());
+        sink.registerSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+                functionPkgUrl, sinkConfig, clientAppId(), clientAuthData());
     }
 
     @PUT
@@ -77,11 +84,11 @@ public class SinksApiV3Resource extends FunctionApiResource {
                            final @FormDataParam("data") InputStream uploadedInputStream,
                            final @FormDataParam("data") FormDataContentDisposition fileDetail,
                            final @FormDataParam("url") String functionPkgUrl,
-                           final @FormDataParam("sinkConfig") String sinkConfigJson,
+                           final @FormDataParam("sinkConfig") SinkConfig sinkConfig,
                            final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
 
-        sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData(), updateOptions);
+        sink.updateSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+                functionPkgUrl, sinkConfig, clientAppId(), clientAuthData(), updateOptions);
     }
 
     @DELETE
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index de6df1a..9be1581 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -33,7 +33,14 @@ import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.io.InputStream;
@@ -62,10 +69,10 @@ public class SourcesApiV3Resource extends FunctionApiResource {
                                    final @FormDataParam("data") InputStream uploadedInputStream,
                                    final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                    final @FormDataParam("url") String functionPkgUrl,
-                                   final @FormDataParam("sourceConfig") String sourceConfigJson) {
+                                   final @FormDataParam("sourceConfig") SourceConfig sourceConfig) {
 
-        source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
+        source.registerSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+                functionPkgUrl, sourceConfig, clientAppId(), clientAuthData());
 
     }
 
@@ -78,11 +85,11 @@ public class SourcesApiV3Resource extends FunctionApiResource {
                              final @FormDataParam("data") InputStream uploadedInputStream,
                              final @FormDataParam("data") FormDataContentDisposition fileDetail,
                              final @FormDataParam("url") String functionPkgUrl,
-                             final @FormDataParam("sourceConfig") String sourceConfigJson,
+                             final @FormDataParam("sourceConfig") SourceConfig sourceConfig,
                              final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
 
-        source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions);
+        source.updateSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+                functionPkgUrl, sourceConfig, clientAppId(), clientAuthData(), updateOptions);
     }
 
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 434cfc2..b3b673b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -484,7 +484,7 @@ public class FunctionApiV3ResourceTest {
                 inputStream,
                 details,
                 functionPkgUrl,
-                new Gson().toJson(functionConfig),
+                functionConfig,
                 null, null);
 
     }
@@ -498,7 +498,7 @@ public class FunctionApiV3ResourceTest {
             mockedInputStream,
             mockedFormData,
             null,
-            new Gson().toJson(functionConfig),
+            functionConfig,
                 null, null);
     }
 
@@ -912,7 +912,7 @@ public class FunctionApiV3ResourceTest {
             inputStream,
             details,
             null,
-            new Gson().toJson(functionConfig),
+            functionConfig,
                 null, null, null);
 
     }
@@ -936,7 +936,7 @@ public class FunctionApiV3ResourceTest {
             mockedInputStream,
             mockedFormData,
             null,
-            new Gson().toJson(functionConfig),
+            functionConfig,
                 null, null, null);
     }
 
@@ -1024,7 +1024,7 @@ public class FunctionApiV3ResourceTest {
             null,
             null,
             filePackageUrl,
-            new Gson().toJson(functionConfig),
+            functionConfig,
                 null, null, null);
 
     }
@@ -1469,7 +1469,7 @@ public class FunctionApiV3ResourceTest {
         functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, new Gson().toJson(functionConfig), null, null);
+        resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, functionConfig, null, null);
 
     }
 
@@ -1500,7 +1500,7 @@ public class FunctionApiV3ResourceTest {
         functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl, new Gson().toJson(functionConfig), null, null);
+        resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl, functionConfig, null, null);
     }
 
     public static FunctionConfig createDefaultFunctionConfig() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 3b3548a..c884835 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -434,28 +434,28 @@ public class SinkApiV3ResourceTest {
             sinkConfig.setParallelism(parallelism);
         }
 
-        resource.registerFunction(
+        resource.registerSink(
                 tenant,
                 namespace,
                 sink,
                 inputStream,
                 details,
                 pkgUrl,
-                new Gson().toJson(sinkConfig),
+                sinkConfig,
                 null, null);
 
     }
 
     private void registerDefaultSink() throws IOException {
         SinkConfig sinkConfig = createDefaultSinkConfig();
-        resource.registerFunction(
+        resource.registerSink(
             tenant,
             namespace,
                 sink,
                 new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
-            new Gson().toJson(sinkConfig),
+            sinkConfig,
                 null, null);
     }
 
@@ -548,14 +548,14 @@ public class SinkApiV3ResourceTest {
         sinkConfig.setClassName(className);
         sinkConfig.setParallelism(parallelism);
         sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
-        resource.registerFunction(
+        resource.registerSink(
                 actualTenant,
                 actualNamespace,
                 actualName,
                 new FileInputStream(JAR_FILE_PATH),
                 mockedFormData,
                 null,
-                new Gson().toJson(sinkConfig),
+                sinkConfig,
                 null, null);
     }
 
@@ -829,14 +829,14 @@ public class SinkApiV3ResourceTest {
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
         }
 
-        resource.updateFunction(
+        resource.updateSink(
             tenant,
             namespace,
             sink,
             inputStream,
             details,
             null,
-            new Gson().toJson(sinkConfig),
+            sinkConfig,
                 null, null, null);
 
     }
@@ -870,14 +870,14 @@ public class SinkApiV3ResourceTest {
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
 
-        resource.updateFunction(
+        resource.updateSink(
             tenant,
             namespace,
                 sink,
                 new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
-            new Gson().toJson(sinkConfig),
+            sinkConfig,
                 null, null, null);
     }
 
@@ -972,14 +972,14 @@ public class SinkApiV3ResourceTest {
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        resource.updateFunction(
+        resource.updateSink(
             tenant,
             namespace,
                 sink,
             null,
             null,
             filePackageUrl,
-            new Gson().toJson(sinkConfig),
+            sinkConfig,
                 null, null, null);
     }
 
@@ -1364,7 +1364,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace does not exist")
-    public void testRegisterFunctionNonExistingNamespace() throws Exception {
+    public void testregisterSinkNonExistingNamespace() throws Exception {
         try {
             this.namespaceList.clear();
             registerDefaultSink();
@@ -1375,7 +1375,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant does not exist")
-    public void testRegisterFunctionNonExistingTenant() throws Exception {
+    public void testregisterSinkNonExistingTenant() throws Exception {
         try {
             when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
             registerDefaultSink();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cd55369..875e2ef 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -404,28 +404,28 @@ public class SourceApiV3ResourceTest {
             sourceConfig.setParallelism(parallelism);
         }
 
-        resource.registerFunction(
+        resource.registerSource(
                 tenant,
                 namespace,
                 function,
                 inputStream,
                 details,
                 pkgUrl,
-                new Gson().toJson(sourceConfig),
+                sourceConfig,
                 null, null);
 
     }
 
     private void registerDefaultSource() throws IOException {
         SourceConfig sourceConfig = createDefaultSourceConfig();
-        resource.registerFunction(
+        resource.registerSource(
             tenant,
             namespace,
                 source,
             new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
-            new Gson().toJson(sourceConfig),
+            sourceConfig,
                 null, null);
     }
 
@@ -518,14 +518,14 @@ public class SourceApiV3ResourceTest {
         sourceConfig.setParallelism(parallelism);
         sourceConfig.setTopicName(outputTopic);
         sourceConfig.setSerdeClassName(outputSerdeClassName);
-        resource.registerFunction(
+        resource.registerSource(
                 actualTenant,
                 actualNamespace,
                 actualName,
                 new FileInputStream(JAR_FILE_PATH),
                 mockedFormData,
                 null,
-                new Gson().toJson(sourceConfig),
+                sourceConfig,
                 null, null);
     }
 
@@ -850,14 +850,14 @@ public class SourceApiV3ResourceTest {
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
         }
 
-        resource.updateFunction(
+        resource.updateSource(
             tenant,
             namespace,
             function,
             inputStream,
             details,
             null,
-            new Gson().toJson(sourceConfig),
+            sourceConfig,
                 null, null, null);
 
     }
@@ -887,14 +887,14 @@ public class SourceApiV3ResourceTest {
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
 
-        resource.updateFunction(
+        resource.updateSource(
             tenant,
             namespace,
                 source,
                 new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
-            new Gson().toJson(sourceConfig),
+            sourceConfig,
                 null, null, null);
     }
 
@@ -988,14 +988,14 @@ public class SourceApiV3ResourceTest {
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        resource.updateFunction(
+        resource.updateSource(
             tenant,
             namespace,
                 source,
             null,
             null,
             filePackageUrl,
-            new Gson().toJson(sourceConfig),
+            sourceConfig,
                 null, null, null);
 
     }