You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/04 00:33:57 UTC

[GitHub] srkukarni closed pull request #2701: Function Serverside Validation

srkukarni closed pull request #2701: Function Serverside Validation
URL: https://github.com/apache/pulsar/pull/2701
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 80f2df552b..cff3c34651 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -80,10 +80,11 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("data") InputStream uploadedInputStream,
                                      final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                      final @FormDataParam("url") String functionPkgUrl,
-                                     final @FormDataParam("functionDetails") String functionDetailsJson) {
+                                     final @FormDataParam("functionDetails") String functionDetailsJson,
+                                     final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
     }
 
     @PUT
@@ -101,10 +102,11 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("data") InputStream uploadedInputStream,
                                    final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                    final @FormDataParam("url") String functionPkgUrl,
-                                   final @FormDataParam("functionDetails") String functionDetailsJson) {
+                                   final @FormDataParam("functionDetails") String functionDetailsJson,
+                                   final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
 
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 9e3c6e8ece..c771e7b5d4 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -45,7 +45,6 @@
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
-import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
 import java.util.Arrays;
@@ -84,10 +83,7 @@
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator;
@@ -229,8 +225,8 @@ void processArguments() throws Exception {
                 description = "Path to the main Python file/Python Wheel file for the function (if the function is written in Python)",
                 listConverter = StringConverter.class)
         protected String pyFile;
-        @Parameter(names = { "-i",
-                "--inputs" }, description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)")
+        @Parameter(names = {"-i",
+                "--inputs"}, description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)")
         protected String inputs;
         // for backwards compatibility purposes
         @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)", hidden = true)
@@ -326,6 +322,8 @@ void processArguments() throws Exception {
         protected String deadLetterTopic;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
+        // The classLoader associated with this function defn
+        protected ClassLoader classLoader;
 
 
         private void mergeArgs() {
@@ -379,12 +377,12 @@ void processArguments() throws Exception {
                 functionConfig.setInputs(inputTopics);
             }
             if (null != customSerdeInputString) {
-                Type type = new TypeToken<Map<String, String>>(){}.getType();
+                Type type = new TypeToken<Map<String, String>>() {}.getType();
                 Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
                 functionConfig.setCustomSerdeInputs(customSerdeInputMap);
             }
             if (null != customSchemaInputString) {
-                Type type = new TypeToken<Map<String, String>>(){}.getType();
+                Type type = new TypeToken<Map<String, String>>() {}.getType();
                 Map<String, String> customschemaInputMap = new Gson().fromJson(customSchemaInputString, type);
                 functionConfig.setCustomSchemaInputs(customschemaInputMap);
             }
@@ -412,13 +410,13 @@ void processArguments() throws Exception {
             }
 
             functionConfig.setRetainOrdering(retainOrdering);
-            
+
             if (isNotBlank(subsName)) {
                 functionConfig.setSubName(subsName);
             }
 
             if (null != userConfigString) {
-                Type type = new TypeToken<Map<String, String>>(){}.getType();
+                Type type = new TypeToken<Map<String, String>>() {}.getType();
                 Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
                 functionConfig.setUserConfig(userConfigMap);
             }
@@ -490,6 +488,9 @@ void processArguments() throws Exception {
 
             // infer default vaues
             inferMissingArguments(functionConfig);
+
+            // check if configs are valid
+            validateFunctionConfigs(functionConfig);
         }
 
         protected void validateFunctionConfigs(FunctionConfig functionConfig) {
@@ -532,26 +533,22 @@ protected void validateFunctionConfigs(FunctionConfig functionConfig) {
 
                 if (jarFilePath != null) {
                     File file = new File(jarFilePath);
-                    ClassLoader userJarLoader;
                     try {
-                        userJarLoader = Reflections.loadJar(file);
+                        classLoader = Reflections.loadJar(file);
                     } catch (MalformedURLException e) {
                         throw new ParameterException(
                                 "Failed to load user jar " + file + " with error " + e.getMessage());
                     }
-                    // make sure the function class loader is accessible thread-locally
-                    Thread.currentThread().setContextClassLoader(userJarLoader);
-
                     (new ImplementsClassesValidator(Function.class, java.util.function.Function.class))
-                            .validateField("className", functionConfig.getClassName());
+                            .validateField("className", functionConfig.getClassName(), classLoader);
                 }
             }
 
             try {
                 // Need to load jar and set context class loader before calling
-                ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name());
+                ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), classLoader);
             } catch (Exception e) {
-                throw new ParameterException(e.getMessage());
+                throw new IllegalArgumentException(e.getMessage());
             }
         }
 
@@ -588,7 +585,7 @@ private void inferMissingFunctionName(FunctionConfig functionConfig) {
                 throw new ParameterException("You must specify a class name for the function");
             }
 
-            String [] domains = functionConfig.getClassName().split("\\.");
+            String[] domains = functionConfig.getClassName().split("\\.");
             if (domains.length == 0) {
                 functionConfig.setName(functionConfig.getClassName());
             } else {
@@ -603,183 +600,6 @@ private void inferMissingTenant(FunctionConfig functionConfig) {
         private void inferMissingNamespace(FunctionConfig functionConfig) {
             functionConfig.setNamespace(DEFAULT_NAMESPACE);
         }
-
-        protected FunctionDetails convert(FunctionConfig functionConfig)
-                throws IOException {
-
-            // check if configs are valid
-            validateFunctionConfigs(functionConfig);
-
-            Class<?>[] typeArgs = null;
-            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-                if (functionConfig.getJar().startsWith(Utils.FILE)) {
-                    // server derives the arg-type by loading a class
-                    if (isBlank(functionConfig.getClassName())) {
-                        throw new ParameterException("Class-name must be present for jar with file-url");
-                    }
-                } else {
-                    typeArgs = Utils.getFunctionTypes(functionConfig);
-                }
-            }
-
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-
-            // Setup source
-            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            if (functionConfig.getInputs() != null) {
-                functionConfig.getInputs().forEach((topicName -> {
-                    sourceSpecBuilder.putInputSpecs(topicName,
-                            ConsumerSpec.newBuilder()
-                                    .setIsRegexPattern(false)
-                                    .build());
-                }));
-            }
-            if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
-                sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(),
-                        ConsumerSpec.newBuilder()
-                                .setIsRegexPattern(true)
-                                .build());
-            }
-            if (functionConfig.getCustomSerdeInputs() != null) {
-                functionConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> {
-                    sourceSpecBuilder.putInputSpecs(topicName,
-                            ConsumerSpec.newBuilder()
-                                    .setSerdeClassName(serdeClassName)
-                                    .setIsRegexPattern(false)
-                                    .build());
-                });
-            }
-            if (functionConfig.getCustomSchemaInputs() != null) {
-                functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> {
-                    sourceSpecBuilder.putInputSpecs(topicName,
-                            ConsumerSpec.newBuilder()
-                                    .setSchemaType(schemaType)
-                                    .setIsRegexPattern(false)
-                                    .build());
-                });
-            }
-            if (functionConfig.getInputSpecs() != null) {
-                functionConfig.getInputSpecs().forEach((topicName, consumerConf) -> {
-                    ConsumerSpec.Builder bldr = ConsumerSpec.newBuilder()
-                            .setIsRegexPattern(consumerConf.isRegexPattern());
-                    if (!StringUtils.isBlank(consumerConf.getSchemaType())) {
-                        bldr.setSchemaType(consumerConf.getSchemaType());
-                    } else if (!StringUtils.isBlank(consumerConf.getSerdeClassName())) {
-                        bldr.setSerdeClassName(consumerConf.getSerdeClassName());
-                    }
-                    sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
-                });
-            }
-
-            // Set subscription type based on ordering and EFFECTIVELY_ONCE semantics
-            SubscriptionType subType = (functionConfig.isRetainOrdering()
-                    || ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
-                            ? SubscriptionType.FAILOVER
-                            : SubscriptionType.SHARED;
-            sourceSpecBuilder.setSubscriptionType(subType);
-            
-            if (isNotBlank(functionConfig.getSubName())) {
-                sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName());
-            }
-
-            if (typeArgs != null) {
-                sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
-            }
-            if (functionConfig.getTimeoutMs() != null) {
-                sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
-            }
-            functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-            // Setup sink
-            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            if (functionConfig.getOutput() != null) {
-                sinkSpecBuilder.setTopic(functionConfig.getOutput());
-            }
-            if (!StringUtils.isBlank(functionConfig.getOutputSerdeClassName())) {
-                sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
-            }
-            if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) {
-                sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType());
-            }
-
-            if (typeArgs != null) {
-                sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
-            }
-            functionDetailsBuilder.setSink(sinkSpecBuilder);
-
-            if (functionConfig.getTenant() != null) {
-                functionDetailsBuilder.setTenant(functionConfig.getTenant());
-            }
-            if (functionConfig.getNamespace() != null) {
-                functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
-            }
-            if (functionConfig.getName() != null) {
-                functionDetailsBuilder.setName(functionConfig.getName());
-            }
-            if (functionConfig.getLogTopic() != null) {
-                functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
-            }
-            if (functionConfig.getRuntime() != null) {
-                functionDetailsBuilder.setRuntime(Utils.convertRuntime(functionConfig.getRuntime()));
-            }
-            if (functionConfig.getProcessingGuarantees() != null) {
-                functionDetailsBuilder.setProcessingGuarantees(
-                        Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
-            }
-
-            if (functionConfig.getMaxMessageRetries() >= 0) {
-                RetryDetails.Builder retryBuilder = RetryDetails.newBuilder();
-                retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
-                if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
-                    retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
-                }
-                functionDetailsBuilder.setRetryDetails(retryBuilder);
-            }
-
-            Map<String, Object> configs = new HashMap<>();
-            configs.putAll(functionConfig.getUserConfig());
-
-            // windowing related
-            WindowConfig windowConfig = functionConfig.getWindowConfig();
-            if (windowConfig != null) {
-                windowConfig.setActualWindowFunctionClassName(functionConfig.getClassName());
-                configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
-                // set class name to window function executor
-                functionDetailsBuilder.setClassName(WindowFunctionExecutor.class.getName());
-
-            } else {
-                if (functionConfig.getClassName() != null) {
-                    functionDetailsBuilder.setClassName(functionConfig.getClassName());
-                }
-            }
-            if (!configs.isEmpty()) {
-                functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
-            }
-
-            functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
-            functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
-            if (functionConfig.getResources() != null) {
-                Resources.Builder bldr = Resources.newBuilder();
-                if (functionConfig.getResources().getCpu() != null) {
-                    bldr.setCpu(functionConfig.getResources().getCpu());
-                }
-                if (functionConfig.getResources().getRam() != null) {
-                    bldr.setRam(functionConfig.getResources().getRam());
-                }
-                if (functionConfig.getResources().getDisk() != null) {
-                    bldr.setDisk(functionConfig.getResources().getDisk());
-                }
-                functionDetailsBuilder.setResources(bldr.build());
-            }
-            return functionDetailsBuilder.build();
-        }
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails convertProto2(FunctionConfig functionConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(convert(functionConfig)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
-        }
     }
 
     @Parameters(commandDescription = "Run the Pulsar Function locally (rather than deploying it to the Pulsar cluster)")
@@ -848,7 +668,7 @@ private void mergeArgs() {
         void runCmd() throws Exception {
             // merge deprecated args with new args
             mergeArgs();
-            CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(),
+            CmdFunctions.startLocalRun(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getParallelism(),
                     instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
                     AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
                             .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
@@ -864,9 +684,9 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) {
-                admin.functions().createFunctionWithUrl(convert(functionConfig), functionConfig.getJar());
+                admin.functions().createFunctionWithUrl(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getJar());
             } else {
-                admin.functions().createFunction(convert(functionConfig), userCodeFile);
+                admin.functions().createFunction(FunctionConfigUtils.convert(functionConfig, classLoader), userCodeFile);
             }
 
             print("Created successfully");
@@ -956,9 +776,9 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) {
-                admin.functions().updateFunctionWithUrl(convert(functionConfig), functionConfig.getJar());
+                admin.functions().updateFunctionWithUrl(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getJar());
             } else {
-                admin.functions().updateFunction(convert(functionConfig), userCodeFile);
+                admin.functions().updateFunction(FunctionConfigUtils.convert(functionConfig, classLoader), userCodeFile);
             }
             print("Updated successfully");
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 8f9eefea94..bd5af791a8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -22,7 +22,6 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.fileExists;
 import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
 
@@ -34,14 +33,11 @@
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -56,20 +52,8 @@
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.Function.Resources;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.proto.Function.SubscriptionType;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.SinkConfig;
-import org.apache.pulsar.functions.utils.ConsumerConfig;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
@@ -203,9 +187,9 @@ protected String validateSinkType(String sinkType) throws IOException {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
-                admin.functions().createFunctionWithUrl(createSinkConfig(sinkConfig), sinkConfig.getArchive());
+                admin.functions().createFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
             } else {
-                admin.functions().createFunction(createSinkConfig(sinkConfig), sinkConfig.getArchive());
+                admin.functions().createFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
             }
             print("Created successfully");
         }
@@ -216,9 +200,9 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
-                admin.functions().updateFunctionWithUrl(createSinkConfig(sinkConfig), sinkConfig.getArchive());
+                admin.functions().updateFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
             } else {
-                admin.functions().updateFunction(createSinkConfig(sinkConfig), sinkConfig.getArchive());
+                admin.functions().updateFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
             }
             print("Updated successfully");
         }
@@ -413,6 +397,9 @@ void processArguments() throws Exception {
             }
             
             inferMissingArguments(sinkConfig);
+
+            // check if configs are valid
+            validateSinkConfigs(sinkConfig);
         }
 
         protected Map<String, Object> parseConfigs(String str) {
@@ -462,6 +449,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
             }
 
             // if jar file is present locally then load jar and validate SinkClass in it
+            ClassLoader classLoader = null;
             if (archivePath != null) {
                 if (!fileExists(archivePath)) {
                     throw new ParameterException("Archive file " + archivePath + " does not exist");
@@ -470,17 +458,20 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
                 try {
                     ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath);
                     log.info("Connector: {}", connector);
-
-                    // Validate sink class
-                    ConnectorUtils.getIOSinkClass(archivePath);
                 } catch (IOException e) {
                     throw new ParameterException("Connector from " + archivePath + " has error: " + e.getMessage());
                 }
+
+                try {
+                    classLoader = NarClassLoader.getFromArchive(new File(archivePath), Collections.emptySet());
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(e);
+                }
             }
 
             try {
                 // Need to load jar and set context class loader before calling
-                ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name());
+                ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), classLoader);
             } catch (Exception e) {
                 throw new ParameterException(e.getMessage());
             }
@@ -491,152 +482,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
                 throws IOException {
             org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
                     = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(createSinkConfig(sinkConfig)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
-        }
-
-        protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOException {
-
-            // check if configs are valid
-            validateSinkConfigs(sinkConfig);
-
-            String sinkClassName = null;
-            String typeArg = null;
-
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-
-            boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN);
-
-            if (!isBuiltin) {
-                if (sinkConfig.getArchive().startsWith(Utils.FILE)) {
-                    if (isBlank(sinkConfig.getClassName())) {
-                        throw new ParameterException("Class-name must be present for archive with file-url");
-                    }
-                    sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class
-                } else {
-                    sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
-                    try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
-                            Collections.emptySet())) {
-                        typeArg = Utils.getSinkType(sinkClassName, ncl).getName();
-                    }
-                }
-            }
-
-            if (sinkConfig.getTenant() != null) {
-                functionDetailsBuilder.setTenant(sinkConfig.getTenant());
-            }
-            if (sinkConfig.getNamespace() != null) {
-                functionDetailsBuilder.setNamespace(sinkConfig.getNamespace());
-            }
-            if (sinkConfig.getName() != null) {
-                functionDetailsBuilder.setName(sinkConfig.getName());
-            }
-            functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-            functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
-            functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
-            if (sinkConfig.getProcessingGuarantees() != null) {
-                functionDetailsBuilder.setProcessingGuarantees(
-                        convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
-            }
-
-            // set source spec
-            // source spec classname should be empty so that the default pulsar source will be used
-            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
-            if (sinkConfig.getInputs() !=  null) {
-                sinkConfig.getInputs().forEach(topicName ->
-                        sourceSpecBuilder.putInputSpecs(topicName,
-                        ConsumerSpec.newBuilder()
-                                .setIsRegexPattern(false)
-                                .build()));
-            }
-            if (!StringUtils.isEmpty(sinkConfig.getTopicsPattern())) {
-                sourceSpecBuilder.putInputSpecs(sinkConfig.getTopicsPattern(),
-                        ConsumerSpec.newBuilder()
-                                .setIsRegexPattern(true)
-                                .build());
-            }
-            if (sinkConfig.getTopicToSerdeClassName() != null) {
-                sinkConfig.getTopicToSerdeClassName().forEach((topicName, serde) -> {
-                    sourceSpecBuilder.putInputSpecs(topicName,
-                            ConsumerSpec.newBuilder()
-                                    .setSerdeClassName(serde == null ? "" : serde)
-                                    .setIsRegexPattern(false)
-                                    .build());
-                    });
-            }
-            if (sinkConfig.getTopicToSchemaType() != null) {
-                sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
-                    sourceSpecBuilder.putInputSpecs(topicName,
-                            ConsumerSpec.newBuilder()
-                                    .setSchemaType(schemaType == null ? "" : schemaType)
-                                    .setIsRegexPattern(false)
-                                    .build());
-                });
-            }
-            if (sinkConfig.getInputSpecs() != null) {
-                sinkConfig.getInputSpecs().forEach((topic, spec) -> {
-                    sourceSpecBuilder.putInputSpecs(topic,
-                            ConsumerSpec.newBuilder()
-                                    .setSerdeClassName(spec.getSerdeClassName() != null ? spec.getSerdeClassName() : "")
-                                    .setSchemaType(spec.getSchemaType() != null ? spec.getSchemaType() : "")
-                                    .setIsRegexPattern(spec.isRegexPattern())
-                                    .build());
-                });
-            }
-
-            if (typeArg != null) {
-                sourceSpecBuilder.setTypeClassName(typeArg);
-            }
-            if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
-                sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
-            }
-
-            SubscriptionType subType = (sinkConfig.isRetainOrdering()
-                    || ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
-                            ? SubscriptionType.FAILOVER
-                            : SubscriptionType.SHARED;
-            sourceSpecBuilder.setSubscriptionType(subType);
-
-            functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
-            if (sinkConfig.getTimeoutMs() != null) {
-                sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
-            }
-            
-            functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-            // set up sink spec
-            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            if (sinkClassName != null) {
-                sinkSpecBuilder.setClassName(sinkClassName);
-            }
-
-            if (isBuiltin) {
-                String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", "");
-                sinkSpecBuilder.setBuiltin(builtin);
-            }
-
-            if (sinkConfig.getConfigs() != null) {
-                sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
-            }
-            if (typeArg != null) {
-                sinkSpecBuilder.setTypeClassName(typeArg);
-            }
-            functionDetailsBuilder.setSink(sinkSpecBuilder);
-
-            if (sinkConfig.getResources() != null) {
-                Resources.Builder bldr = Resources.newBuilder();
-                if (sinkConfig.getResources().getCpu() != null) {
-                    bldr.setCpu(sinkConfig.getResources().getCpu());
-                }
-                if (sinkConfig.getResources().getRam() != null) {
-                    bldr.setRam(sinkConfig.getResources().getRam());
-                }
-                if (sinkConfig.getResources().getDisk() != null) {
-                    bldr.setDisk(sinkConfig.getResources().getDisk());
-                }
-                functionDetailsBuilder.setResources(bldr.build());
-            }
+            Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig)), functionDetailsBuilder);
             return functionDetailsBuilder.build();
         }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index fb32a6a02e..5d0c84c90d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -20,9 +20,7 @@
 
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
 
 import com.beust.jcommander.Parameter;
@@ -33,7 +31,6 @@
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.nio.file.Paths;
@@ -52,14 +49,10 @@
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.Function.Resources;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.SourceConfig;
+import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
@@ -194,9 +187,9 @@ protected String validateSourceType(String sourceType) throws IOException {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) {
-                admin.functions().createFunctionWithUrl(createSourceConfig(sourceConfig), sourceConfig.getArchive());
+                admin.functions().createFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
             } else {
-                admin.functions().createFunction(createSourceConfig(sourceConfig), sourceConfig.getArchive());
+                admin.functions().createFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
             }
             print("Created successfully");
         }
@@ -207,9 +200,9 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
-                admin.functions().updateFunctionWithUrl(createSourceConfig(sourceConfig), sourceConfig.getArchive());
+                admin.functions().updateFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
             } else {
-                admin.functions().updateFunction(createSourceConfig(sourceConfig), sourceConfig.getArchive());
+                admin.functions().updateFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
             }
             print("Updated successfully");
         }
@@ -357,6 +350,9 @@ void processArguments() throws Exception {
             }
 
             inferMissingArguments(sourceConfig);
+
+            // check if source configs are valid
+            validateSourceConfigs(sourceConfig);
         }
 
         protected Map<String, Object> parseConfigs(String str) {
@@ -407,6 +403,7 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) {
 
 
             // if jar file is present locally then load jar and validate SinkClass in it
+            ClassLoader classLoader = null;
             if (archivePath != null) {
                 if (!fileExists(archivePath)) {
                     throw new ParameterException("Archive file " + archivePath + " does not exist");
@@ -415,17 +412,21 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) {
                 try {
                     ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath);
                     log.info("Connector: {}", connector);
-
-                    // Validate source class
-                    ConnectorUtils.getIOSourceClass(archivePath);
                 } catch (IOException e) {
                     throw new ParameterException("Connector from " + archivePath + " has error: " + e.getMessage());
                 }
+
+                try {
+                    classLoader = NarClassLoader.getFromArchive(new File(archivePath),
+                            Collections.emptySet());
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(e);
+                }
             }
 
             try {
              // Need to load jar and set context class loader before calling
-                ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name());
+                ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), classLoader);
             } catch (Exception e) {
                 throw new ParameterException(e.getMessage());
             }
@@ -435,108 +436,7 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) {
                 throws IOException {
             org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
                     = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
-        }
-
-        protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) throws IOException {
-
-            // check if source configs are valid
-            validateSourceConfigs(sourceConfig);
-
-            String sourceClassName = null;
-            String typeArg = null;
-
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-
-            boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN);
-
-            if (!isBuiltin) {
-                if (sourceConfig.getArchive().startsWith(Utils.FILE)) {
-                    if (StringUtils.isBlank(sourceConfig.getClassName())) {
-                        throw new ParameterException("Class-name must be present for archive with file-url");
-                    }
-                    sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class
-                } else {
-                    sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
-
-                    try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()),
-                            Collections.emptySet())) {
-                        typeArg = getSourceType(sourceClassName, ncl).getName();
-                    }
-                }
-            }
-
-            if (sourceConfig.getTenant() != null) {
-                functionDetailsBuilder.setTenant(sourceConfig.getTenant());
-            }
-            if (sourceConfig.getNamespace() != null) {
-                functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
-            }
-            if (sourceConfig.getName() != null) {
-                functionDetailsBuilder.setName(sourceConfig.getName());
-            }
-            functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-            functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
-            functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
-            functionDetailsBuilder.setAutoAck(true);
-            if (sourceConfig.getProcessingGuarantees() != null) {
-                functionDetailsBuilder.setProcessingGuarantees(
-                        convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
-            }
-
-            // set source spec
-            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            if (sourceClassName != null) {
-                sourceSpecBuilder.setClassName(sourceClassName);
-            }
-
-            if (isBuiltin) {
-                String builtin = sourceConfig.getArchive().replaceFirst("^builtin://", "");
-                sourceSpecBuilder.setBuiltin(builtin);
-            }
-
-            if (sourceConfig.getConfigs() != null) {
-                sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs()));
-            }
-
-            if (typeArg != null) {
-                sourceSpecBuilder.setTypeClassName(typeArg);
-            }
-            functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-            // set up sink spec.
-            // Sink spec classname should be empty so that the default pulsar sink will be used
-            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
-                sinkSpecBuilder.setSchemaType(sourceConfig.getSchemaType());
-            }
-            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
-                sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
-            }
-
-            sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
-
-            if (typeArg != null) {
-                sinkSpecBuilder.setTypeClassName(typeArg);
-            }
-
-            functionDetailsBuilder.setSink(sinkSpecBuilder);
-
-            if (sourceConfig.getResources() != null) {
-                Resources.Builder bldr = Resources.newBuilder();
-                if (sourceConfig.getResources().getCpu() != null) {
-                    bldr.setCpu(sourceConfig.getResources().getCpu());
-                }
-                if (sourceConfig.getResources().getRam() != null) {
-                    bldr.setRam(sourceConfig.getResources().getRam());
-                }
-                if (sourceConfig.getResources().getDisk() != null) {
-                    bldr.setDisk(sourceConfig.getResources().getDisk());
-                }
-                functionDetailsBuilder.setResources(bldr.build());
-            }
-
+            Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig)), functionDetailsBuilder);
             return functionDetailsBuilder.build();
         }
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index fb13d39913..fc0f180bc1 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -435,7 +435,7 @@ public void testMissingArchive() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Connector from .*.pulsar-io-twitter.nar has error: The 'twitter' connector does not provide a sink implementation")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "The 'twitter' connector does not provide a sink implementation")
     public void testInvalidJarWithNoSource() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
         sinkConfig.setArchive(WRONG_JAR_PATH);
@@ -783,7 +783,7 @@ public void testCmdSinkConfigFileInvalidJar() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Connector from .*.pulsar-io-twitter.nar has error: The 'twitter' connector does not provide a sink implementation")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "The 'twitter' connector does not provide a sink implementation")
     public void testCmdSinkConfigFileInvalidJarNoSink() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
         testSinkConfig.setArchive(WRONG_JAR_PATH);
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index fbd51519f8..6548f2d73b 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -355,7 +355,7 @@ public void testInvalidJar() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Connector from .*.pulsar-io-cassandra.nar has error: The 'cassandra' connector does not provide a source implementation")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive")
     public void testInvalidJarWithNoSource() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setArchive(WRONG_JAR_PATH);
@@ -650,7 +650,7 @@ public void testCmdSourceConfigFileInvalidJar() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Connector from .*.pulsar-io-cassandra.nar has error: The 'cassandra' connector does not provide a source implementation")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive")
     public void testCmdSourceConfigFileInvalidJarNoSource() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setArchive(WRONG_JAR_PATH);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index dc368126f4..d2b94e3639 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
@@ -98,7 +99,7 @@
      */
     private String outputSchemaType;
 
-    @isImplementationOfClass(implementsClass = SerDe.class)
+    @ConfigValidationAnnotations.isValidSerde
     private String outputSerdeClassName;
     @isValidTopicName
     private String logTopic;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
new file mode 100644
index 0000000000..d546830701
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+public class FunctionConfigUtils {
+
+    public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
+            throws IllegalArgumentException {
+
+        Class<?>[] typeArgs = null;
+        if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+            if (classLoader != null) {
+                typeArgs = Utils.getFunctionTypes(functionConfig, classLoader);
+            }
+        }
+
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+
+        // Setup source
+        Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
+        if (functionConfig.getInputs() != null) {
+            functionConfig.getInputs().forEach((topicName -> {
+                sourceSpecBuilder.putInputSpecs(topicName,
+                        Function.ConsumerSpec.newBuilder()
+                                .setIsRegexPattern(false)
+                                .build());
+            }));
+        }
+        if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
+            sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(),
+                    Function.ConsumerSpec.newBuilder()
+                            .setIsRegexPattern(true)
+                            .build());
+        }
+        if (functionConfig.getCustomSerdeInputs() != null) {
+            functionConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> {
+                sourceSpecBuilder.putInputSpecs(topicName,
+                        Function.ConsumerSpec.newBuilder()
+                                .setSerdeClassName(serdeClassName)
+                                .setIsRegexPattern(false)
+                                .build());
+            });
+        }
+        if (functionConfig.getCustomSchemaInputs() != null) {
+            functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> {
+                sourceSpecBuilder.putInputSpecs(topicName,
+                        Function.ConsumerSpec.newBuilder()
+                                .setSchemaType(schemaType)
+                                .setIsRegexPattern(false)
+                                .build());
+            });
+        }
+        if (functionConfig.getInputSpecs() != null) {
+            functionConfig.getInputSpecs().forEach((topicName, consumerConf) -> {
+                Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
+                        .setIsRegexPattern(consumerConf.isRegexPattern());
+                if (!StringUtils.isBlank(consumerConf.getSchemaType())) {
+                    bldr.setSchemaType(consumerConf.getSchemaType());
+                } else if (!StringUtils.isBlank(consumerConf.getSerdeClassName())) {
+                    bldr.setSerdeClassName(consumerConf.getSerdeClassName());
+                }
+                sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
+            });
+        }
+
+        // Set subscription type based on ordering and EFFECTIVELY_ONCE semantics
+        Function.SubscriptionType subType = (functionConfig.isRetainOrdering()
+                || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
+                ? Function.SubscriptionType.FAILOVER
+                : Function.SubscriptionType.SHARED;
+        sourceSpecBuilder.setSubscriptionType(subType);
+
+        if (isNotBlank(functionConfig.getSubName())) {
+            sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName());
+        }
+
+        if (typeArgs != null) {
+            sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
+        }
+        if (functionConfig.getTimeoutMs() != null) {
+            sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
+        }
+        functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+        // Setup sink
+        Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
+        if (functionConfig.getOutput() != null) {
+            sinkSpecBuilder.setTopic(functionConfig.getOutput());
+        }
+        if (!StringUtils.isBlank(functionConfig.getOutputSerdeClassName())) {
+            sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
+        }
+        if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) {
+            sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType());
+        }
+
+        if (typeArgs != null) {
+            sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
+        }
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+        if (functionConfig.getTenant() != null) {
+            functionDetailsBuilder.setTenant(functionConfig.getTenant());
+        }
+        if (functionConfig.getNamespace() != null) {
+            functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
+        }
+        if (functionConfig.getName() != null) {
+            functionDetailsBuilder.setName(functionConfig.getName());
+        }
+        if (functionConfig.getLogTopic() != null) {
+            functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
+        }
+        if (functionConfig.getRuntime() != null) {
+            functionDetailsBuilder.setRuntime(Utils.convertRuntime(functionConfig.getRuntime()));
+        }
+        if (functionConfig.getProcessingGuarantees() != null) {
+            functionDetailsBuilder.setProcessingGuarantees(
+                    Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
+        }
+
+        if (functionConfig.getMaxMessageRetries() >= 0) {
+            Function.RetryDetails.Builder retryBuilder = Function.RetryDetails.newBuilder();
+            retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
+            if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
+                retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
+            }
+            functionDetailsBuilder.setRetryDetails(retryBuilder);
+        }
+
+        Map<String, Object> configs = new HashMap<>();
+        if (functionConfig.getUserConfig() != null) {
+            configs.putAll(functionConfig.getUserConfig());
+        }
+
+        // windowing related
+        WindowConfig windowConfig = functionConfig.getWindowConfig();
+        if (windowConfig != null) {
+            windowConfig.setActualWindowFunctionClassName(functionConfig.getClassName());
+            configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
+            // set class name to window function executor
+            functionDetailsBuilder.setClassName("org.apache.pulsar.functions.windowing.WindowFunctionExecutor");
+
+        } else {
+            if (functionConfig.getClassName() != null) {
+                functionDetailsBuilder.setClassName(functionConfig.getClassName());
+            }
+        }
+        if (!configs.isEmpty()) {
+            functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
+        }
+
+        functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
+        functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
+        if (functionConfig.getResources() != null) {
+            Function.Resources.Builder bldr = Function.Resources.newBuilder();
+            if (functionConfig.getResources().getCpu() != null) {
+                bldr.setCpu(functionConfig.getResources().getCpu());
+            }
+            if (functionConfig.getResources().getRam() != null) {
+                bldr.setRam(functionConfig.getResources().getRam());
+            }
+            if (functionConfig.getResources().getDisk() != null) {
+                bldr.setDisk(functionConfig.getResources().getDisk());
+            }
+            functionDetailsBuilder.setResources(bldr.build());
+        }
+        return functionDetailsBuilder.build();
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
new file mode 100644
index 0000000000..d44ea301a4
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
+
+public class SinkConfigUtils {
+
+    public static FunctionDetails convert(SinkConfig sinkConfig) throws IOException {
+
+        String sinkClassName = null;
+        String typeArg = null;
+
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+
+        boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN);
+
+        if (!isBuiltin) {
+            if (sinkConfig.getArchive().startsWith(Utils.FILE)) {
+                if (isBlank(sinkConfig.getClassName())) {
+                    throw new IllegalArgumentException("Class-name must be present for archive with file-url");
+                }
+                sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class
+            } else {
+                sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
+                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
+                        Collections.emptySet())) {
+                    typeArg = Utils.getSinkType(sinkClassName, ncl).getName();
+                }
+            }
+        }
+
+        if (sinkConfig.getTenant() != null) {
+            functionDetailsBuilder.setTenant(sinkConfig.getTenant());
+        }
+        if (sinkConfig.getNamespace() != null) {
+            functionDetailsBuilder.setNamespace(sinkConfig.getNamespace());
+        }
+        if (sinkConfig.getName() != null) {
+            functionDetailsBuilder.setName(sinkConfig.getName());
+        }
+        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+        functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
+        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+        if (sinkConfig.getProcessingGuarantees() != null) {
+            functionDetailsBuilder.setProcessingGuarantees(
+                    convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
+        }
+
+        // set source spec
+        // source spec classname should be empty so that the default pulsar source will be used
+        Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
+        sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        if (sinkConfig.getInputs() != null) {
+            sinkConfig.getInputs().forEach(topicName ->
+                    sourceSpecBuilder.putInputSpecs(topicName,
+                            Function.ConsumerSpec.newBuilder()
+                                    .setIsRegexPattern(false)
+                                    .build()));
+        }
+        if (!StringUtils.isEmpty(sinkConfig.getTopicsPattern())) {
+            sourceSpecBuilder.putInputSpecs(sinkConfig.getTopicsPattern(),
+                    Function.ConsumerSpec.newBuilder()
+                            .setIsRegexPattern(true)
+                            .build());
+        }
+        if (sinkConfig.getTopicToSerdeClassName() != null) {
+            sinkConfig.getTopicToSerdeClassName().forEach((topicName, serde) -> {
+                sourceSpecBuilder.putInputSpecs(topicName,
+                        Function.ConsumerSpec.newBuilder()
+                                .setSerdeClassName(serde == null ? "" : serde)
+                                .setIsRegexPattern(false)
+                                .build());
+            });
+        }
+        if (sinkConfig.getTopicToSchemaType() != null) {
+            sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
+                sourceSpecBuilder.putInputSpecs(topicName,
+                        Function.ConsumerSpec.newBuilder()
+                                .setSchemaType(schemaType == null ? "" : schemaType)
+                                .setIsRegexPattern(false)
+                                .build());
+            });
+        }
+        if (sinkConfig.getInputSpecs() != null) {
+            sinkConfig.getInputSpecs().forEach((topic, spec) -> {
+                sourceSpecBuilder.putInputSpecs(topic,
+                        Function.ConsumerSpec.newBuilder()
+                                .setSerdeClassName(spec.getSerdeClassName() != null ? spec.getSerdeClassName() : "")
+                                .setSchemaType(spec.getSchemaType() != null ? spec.getSchemaType() : "")
+                                .setIsRegexPattern(spec.isRegexPattern())
+                                .build());
+            });
+        }
+
+        if (typeArg != null) {
+            sourceSpecBuilder.setTypeClassName(typeArg);
+        }
+        if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
+            sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
+        }
+
+        Function.SubscriptionType subType = (sinkConfig.isRetainOrdering()
+                || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
+                ? Function.SubscriptionType.FAILOVER
+                : Function.SubscriptionType.SHARED;
+        sourceSpecBuilder.setSubscriptionType(subType);
+
+        functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
+        if (sinkConfig.getTimeoutMs() != null) {
+            sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
+        }
+
+        functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+        // set up sink spec
+        Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
+        if (sinkClassName != null) {
+            sinkSpecBuilder.setClassName(sinkClassName);
+        }
+
+        if (isBuiltin) {
+            String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", "");
+            sinkSpecBuilder.setBuiltin(builtin);
+        }
+
+        if (sinkConfig.getConfigs() != null) {
+            sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
+        }
+        if (typeArg != null) {
+            sinkSpecBuilder.setTypeClassName(typeArg);
+        }
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+        if (sinkConfig.getResources() != null) {
+            Function.Resources.Builder bldr = Function.Resources.newBuilder();
+            if (sinkConfig.getResources().getCpu() != null) {
+                bldr.setCpu(sinkConfig.getResources().getCpu());
+            }
+            if (sinkConfig.getResources().getRam() != null) {
+                bldr.setRam(sinkConfig.getResources().getRam());
+            }
+            if (sinkConfig.getResources().getDisk() != null) {
+                bldr.setDisk(sinkConfig.getResources().getDisk());
+            }
+            functionDetailsBuilder.setResources(bldr.build());
+        }
+        return functionDetailsBuilder.build();
+    }
+}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
new file mode 100644
index 0000000000..73b331ad9d
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.Utils.getSourceType;
+
+public class SourceConfigUtils {
+
+    public static FunctionDetails convert(SourceConfig sourceConfig)
+            throws IllegalArgumentException, IOException {
+
+        String sourceClassName = null;
+        String typeArg = null;
+
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+
+        boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN);
+
+        if (!isBuiltin) {
+            if (sourceConfig.getArchive().startsWith(Utils.FILE)) {
+                if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) {
+                    throw new IllegalArgumentException("Class-name must be present for archive with file-url");
+                }
+                sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class
+            } else {
+                sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
+
+                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()),
+                        Collections.emptySet())) {
+                    typeArg = getSourceType(sourceClassName, ncl).getName();
+                }
+            }
+        }
+
+        if (sourceConfig.getTenant() != null) {
+            functionDetailsBuilder.setTenant(sourceConfig.getTenant());
+        }
+        if (sourceConfig.getNamespace() != null) {
+            functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
+        }
+        if (sourceConfig.getName() != null) {
+            functionDetailsBuilder.setName(sourceConfig.getName());
+        }
+        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+        functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
+        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+        functionDetailsBuilder.setAutoAck(true);
+        if (sourceConfig.getProcessingGuarantees() != null) {
+            functionDetailsBuilder.setProcessingGuarantees(
+                    convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
+        }
+
+        // set source spec
+        Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
+        if (sourceClassName != null) {
+            sourceSpecBuilder.setClassName(sourceClassName);
+        }
+
+        if (isBuiltin) {
+            String builtin = sourceConfig.getArchive().replaceFirst("^builtin://", "");
+            sourceSpecBuilder.setBuiltin(builtin);
+        }
+
+        if (sourceConfig.getConfigs() != null) {
+            sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs()));
+        }
+
+        if (typeArg != null) {
+            sourceSpecBuilder.setTypeClassName(typeArg);
+        }
+        functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+        // set up sink spec.
+        // Sink spec classname should be empty so that the default pulsar sink will be used
+        Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
+        if (!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getSchemaType())) {
+            sinkSpecBuilder.setSchemaType(sourceConfig.getSchemaType());
+        }
+        if (!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
+            sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
+        }
+
+        sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
+
+        if (typeArg != null) {
+            sinkSpecBuilder.setTypeClassName(typeArg);
+        }
+
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+        if (sourceConfig.getResources() != null) {
+            Function.Resources.Builder bldr = Function.Resources.newBuilder();
+            if (sourceConfig.getResources().getCpu() != null) {
+                bldr.setCpu(sourceConfig.getResources().getCpu());
+            }
+            if (sourceConfig.getResources().getRam() != null) {
+                bldr.setRam(sourceConfig.getResources().getRam());
+            }
+            if (sourceConfig.getResources().getDisk() != null) {
+                bldr.setDisk(sourceConfig.getResources().getDisk());
+            }
+            functionDetailsBuilder.setResources(bldr.build());
+        }
+
+        return functionDetailsBuilder.build();
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 94c315df3f..7befc859ee 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -101,9 +101,8 @@ public static int findAvailablePort() {
         }
     }
 
-    public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) {
-        Object userClass = createInstance(functionConfig.getClassName(),
-                Thread.currentThread().getContextClassLoader());
+    public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader) {
+        Object userClass = createInstance(functionConfig.getClassName(), classLoader);
         boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
         return getFunctionTypes(userClass, isWindowConfigPresent);
     }
@@ -187,10 +186,6 @@ public static Runtime convertRuntime(FunctionConfig.Runtime runtime) {
         throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
     }
 
-    public static Class<?> getSourceType(String className) {
-        return getSourceType(className, Thread.currentThread().getContextClassLoader());
-    }
-
     public static Class<?> getSourceType(String className, ClassLoader classloader) {
 
         Object userClass = Reflections.createInstance(className, classloader);
@@ -205,10 +200,6 @@ public static Runtime convertRuntime(FunctionConfig.Runtime runtime) {
         return typeArg;
     }
 
-    public static Class<?> getSinkType(String className) {
-        return getSinkType(className, Thread.currentThread().getContextClassLoader());
-    }
-
     public static Class<?> getSinkType(String className, ClassLoader classLoader) {
 
         Object userClass = Reflections.createInstance(className, classLoader);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
index fe89a013fb..88ee41b4f9 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
@@ -44,7 +44,7 @@
         PYTHON
     }
 
-    public static void validateConfig(Object config, String runtimeType) {
+    public static void validateConfig(Object config, String runtimeType, ClassLoader classLoader) {
         for (Field field : config.getClass().getDeclaredFields()) {
             Object value;
             field.setAccessible(true);
@@ -53,12 +53,12 @@ public static void validateConfig(Object config, String runtimeType) {
             } catch (IllegalAccessException e) {
                throw new RuntimeException(e);
             }
-            validateField(field, value, Runtime.valueOf(runtimeType));
+            validateField(field, value, Runtime.valueOf(runtimeType), classLoader);
         }
-        validateClass(config, Runtime.valueOf(runtimeType));
+        validateClass(config, Runtime.valueOf(runtimeType), classLoader);
     }
 
-    private static void validateClass(Object config, Runtime runtime) {
+    private static void validateClass(Object config, Runtime runtime, ClassLoader classLoader) {
 
         List<Annotation> annotationList = new LinkedList<>();
         Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
@@ -70,10 +70,10 @@ private static void validateClass(Object config, Runtime runtime) {
 
             }
         }
-        processAnnotations(annotationList, config.getClass().getName(), config, runtime);
+        processAnnotations(annotationList, config.getClass().getName(), config, runtime, classLoader);
     }
 
-    private static void validateField(Field field, Object value, Runtime runtime) {
+    private static void validateField(Field field, Object value, Runtime runtime, ClassLoader classLoader) {
         List<Annotation> annotationList = new LinkedList<>();
         Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
         for (Class clazz : classes) {
@@ -84,11 +84,11 @@ private static void validateField(Field field, Object value, Runtime runtime) {
 
             }
         }
-        processAnnotations(annotationList, field.getName(), value, runtime);
+        processAnnotations(annotationList, field.getName(), value, runtime, classLoader);
     }
 
     private static void processAnnotations( List<Annotation> annotations, String fieldName, Object value,
-                                           Runtime runtime) {
+                                           Runtime runtime, ClassLoader classLoader) {
         try {
             for (Annotation annotation : annotations) {
 
@@ -127,7 +127,7 @@ private static void processAnnotations( List<Annotation> annotations, String fie
                         } else { //If not call default constructor
                             o = clazz.newInstance();
                         }
-                        o.validateField(fieldName, value);
+                        o.validateField(fieldName, value, classLoader);
                     }
                 }
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
index e6e0583a70..08f0d661ad 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -177,6 +177,17 @@
         ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
+    /**
+     * checks if the topic name is valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isValidSerde {
+        Class<?> validatorClass() default ValidatorImpls.SerdeValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
+    }
+
     /**
      * checks if window configs is valid
      */
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
index 59410482a1..ab8782b546 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
@@ -27,5 +27,5 @@ public Validator(Map<String, Object> params) {
     public Validator() {
     }
 
-    public abstract void validateField(String name, Object o);
+    public abstract void validateField(String name, Object o, ClassLoader classLoader);
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index e8acc2839b..dba6bd91cd 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -92,7 +92,7 @@ public static void validateField(String name, boolean includeZero, Object o) {
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             validateField(name, this.includeZero, o);
         }
     }
@@ -104,7 +104,7 @@ public void validateField(String name, Object o) {
     public static class NotNullValidator extends Validator {
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             if (o == null) {
                 throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name));
             }
@@ -113,9 +113,9 @@ public void validateField(String name, Object o) {
 
     public static class ResourcesValidator extends Validator {
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             if (o == null) {
-                throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name));
+                return;
             }
 
             if (o instanceof Resources) {
@@ -152,7 +152,7 @@ public static void validateField(String name, Class<?> type, Object o) {
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             validateField(name, this.type, o);
         }
     }
@@ -176,7 +176,7 @@ public static void validateField(String name, Class<?> keyType, Class<?> valueTy
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             validateField(name, this.keyType, this.valueType, o);
         }
     }
@@ -194,7 +194,7 @@ public ImplementsClassValidator(Class<?> classImplements) {
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             if (o == null) {
                 return;
             }
@@ -206,7 +206,7 @@ public void validateField(String name, Object o) {
 
             Class<?> objectClass;
             try {
-                objectClass = loadClass(className);
+                objectClass = loadClass(className, classLoader);
             } catch (ClassNotFoundException e) {
                 throw new IllegalArgumentException("Cannot find/load class " + className);
             }
@@ -235,7 +235,7 @@ public ImplementsClassesValidator(Class<?>... classesImplements) {
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             if (o == null) {
                 return;
             }
@@ -248,7 +248,7 @@ public void validateField(String name, Object o) {
             for (Class<?> classImplements : classesImplements) {
                 Class<?> objectClass = null;
                 try {
-                    objectClass = loadClass(className);
+                    objectClass = loadClass(className, classLoader);
                 } catch (ClassNotFoundException e) {
                     throw new IllegalArgumentException("Cannot find/load class " + className);
                 }
@@ -269,8 +269,9 @@ public void validateField(String name, Object o) {
     public static class SerdeValidator extends Validator {
 
         @Override
-        public void validateField(String name, Object o) {
-            new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o);
+        public void validateField(String name, Object o, ClassLoader classLoader) {
+            if (o != null && o.equals(DEFAULT_SERDE)) return;
+            new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o, classLoader);
         }
     }
 
@@ -278,8 +279,8 @@ public void validateField(String name, Object o) {
     public static class SchemaValidator extends Validator {
 
         @Override
-        public void validateField(String name, Object o) {
-            new ValidatorImpls.ImplementsClassValidator(Schema.class).validateField(name, o);
+        public void validateField(String name, Object o, ClassLoader classLoader) {
+            new ValidatorImpls.ImplementsClassValidator(Schema.class).validateField(name, o, classLoader);
         }
     }
 
@@ -298,7 +299,8 @@ public MapEntryCustomValidator(Map<String, Object> params) {
         }
 
         @SuppressWarnings("unchecked")
-        public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o)
+        public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o,
+                                         ClassLoader classLoader)
                 throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
@@ -309,7 +311,7 @@ public static void validateField(String name, Class<?>[] keyValidators, Class<?>
                 for (Class<?> kv : keyValidators) {
                     Object keyValidator = kv.getConstructor().newInstance();
                     if (keyValidator instanceof Validator) {
-                        ((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
+                        ((Validator) keyValidator).validateField(name + " Map key", entry.getKey(), classLoader);
                     } else {
                         log.warn(
                                 "validator: {} cannot be used in MapEntryCustomValidator to validate keys.  Individual entry validators must " +
@@ -320,7 +322,7 @@ public static void validateField(String name, Class<?>[] keyValidators, Class<?>
                 for (Class<?> vv : valueValidators) {
                     Object valueValidator = vv.getConstructor().newInstance();
                     if (valueValidator instanceof Validator) {
-                        ((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
+                        ((Validator) valueValidator).validateField(name + " Map value", entry.getValue(), classLoader);
                     } else {
                         log.warn(
                                 "validator: {} cannot be used in MapEntryCustomValidator to validate values.  Individual entry validators " +
@@ -332,9 +334,9 @@ public static void validateField(String name, Class<?>[] keyValidators, Class<?>
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             try {
-                validateField(name, this.keyValidators, this.valueValidators, o);
+                validateField(name, this.keyValidators, this.valueValidators, o, classLoader);
             } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                 throw new RuntimeException(e);
             }
@@ -357,7 +359,7 @@ public StringValidator(Map<String, Object> params) {
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             SimpleTypeValidator.validateField(name, String.class, o);
             if (this.acceptedValues != null) {
                 if (!this.acceptedValues.contains((String) o)) {
@@ -370,11 +372,8 @@ public void validateField(String name, Object o) {
     @NoArgsConstructor
     public static class FunctionConfigValidator extends Validator {
 
-        private static void doJavaChecks(FunctionConfig functionConfig, String name) {
-            Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig);
-
-            ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
-
+        private static void doJavaChecks(FunctionConfig functionConfig, String name, ClassLoader clsLoader) {
+            Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig, clsLoader);
             // inputs use default schema, so there is no check needed there
 
             // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
@@ -441,7 +440,7 @@ private static void validateSchema(String schemaType, Class<?> typeArg, String n
                 // If it's built-in, no need to validate
             } else {
                 try {
-                    new SchemaValidator().validateField(name, schemaType);
+                    new SchemaValidator().validateField(name, schemaType, clsLoader);
                 } catch (IllegalArgumentException ex) {
                     throw new IllegalArgumentException(
                             String.format("The input schema class %s does not not implement %s",
@@ -455,9 +454,10 @@ private static void validateSchema(String schemaType, Class<?> typeArg, String n
         private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader,
                                           boolean deser) {
             if (StringUtils.isEmpty(inputSerializer)) return;
+            if (inputSerializer.equals(DEFAULT_SERDE)) return;
             Class<?> serdeClass;
             try {
-                serdeClass = loadClass(inputSerializer);
+                serdeClass = loadClass(inputSerializer, clsLoader);
             } catch (ClassNotFoundException e) {
                 throw new IllegalArgumentException(
                         String.format("The input serialization/deserialization class %s does not exist",
@@ -465,7 +465,7 @@ private static void validateSerde(String inputSerializer, Class<?> typeArg, Stri
             }
 
             try {
-                new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer);
+                new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer, clsLoader);
             } catch (IllegalArgumentException ex) {
                 throw new IllegalArgumentException(
                         String.format("The input serialization/deserialization class %s does not not implement %s",
@@ -473,35 +473,31 @@ private static void validateSerde(String inputSerializer, Class<?> typeArg, Stri
                                 inputSerializer, SerDe.class.getCanonicalName()));
             }
 
-            if (inputSerializer.equals(DEFAULT_SERDE)) {
-                // No checks needed here
-            } else {
-                SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
-                if (serDe == null) {
-                    throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
-                            inputSerializer));
-                }
-                Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+            SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
+            if (serDe == null) {
+                throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
+                        inputSerializer));
+            }
+            Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
 
-                // type inheritance information seems to be lost in generic type
-                // load the actual type class for verification
-                Class<?> fnInputClass;
-                Class<?> serdeInputClass;
-                try {
-                    fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
-                    serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
-                } catch (ClassNotFoundException e) {
-                    throw new IllegalArgumentException("Failed to load type class", e);
-                }
+            // type inheritance information seems to be lost in generic type
+            // load the actual type class for verification
+            Class<?> fnInputClass;
+            Class<?> serdeInputClass;
+            try {
+                fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
+                serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException("Failed to load type class", e);
+            }
 
-                if (deser) {
-                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                        throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
-                    }
-                } else {
-                    if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
-                        throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
-                    }
+            if (deser) {
+                if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+                    throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+                }
+            } else {
+                if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
+                    throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
                 }
             }
         }
@@ -584,12 +580,12 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             FunctionConfig functionConfig = (FunctionConfig) o;
             doCommonChecks(functionConfig);
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-                if (!functionConfig.getJar().startsWith(Utils.FILE)) {
-                    doJavaChecks(functionConfig, name);
+                if (classLoader != null) {
+                    doJavaChecks(functionConfig, name, classLoader);
                 }
             } else {
                 doPythonChecks(functionConfig, name);
@@ -609,7 +605,7 @@ public ListEntryCustomValidator(Map<String, Object> params) {
             this.entryValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
         }
 
-        public static void validateField(String name, Class<?>[] validators, Object o)
+        public static void validateField(String name, Class<?>[] validators, Object o, ClassLoader classLoader)
                 throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
@@ -620,7 +616,7 @@ public static void validateField(String name, Class<?>[] validators, Object o)
                 for (Class<?> validator : validators) {
                     Object v = validator.getConstructor().newInstance();
                     if (v instanceof Validator) {
-                        ((Validator) v).validateField(name + " list entry", entry);
+                        ((Validator) v).validateField(name + " list entry", entry, classLoader);
                     } else {
                         log.warn(
                                 "validator: {} cannot be used in ListEntryCustomValidator.  Individual entry validators must a instance of " +
@@ -632,9 +628,9 @@ public static void validateField(String name, Class<?>[] validators, Object o)
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             try {
-                validateField(name, this.entryValidators, o);
+                validateField(name, this.entryValidators, o, classLoader);
             } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
                 throw new RuntimeException(e);
             }
@@ -645,11 +641,11 @@ public void validateField(String name, Object o) {
     public static class TopicNameValidator extends Validator {
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             if (o == null) {
                 return;
             }
-            new StringValidator().validateField(name, o);
+            new StringValidator().validateField(name, o, classLoader);
             String topic = (String) o;
             if (!TopicName.isValid(topic)) {
                 throw new IllegalArgumentException(
@@ -715,7 +711,7 @@ public static void validateWindowConfig(WindowConfig windowConfig) {
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             if (o == null) {
                 return;
             }
@@ -729,7 +725,7 @@ public void validateField(String name, Object o) {
 
     public static class SourceConfigValidator extends Validator {
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             SourceConfig sourceConfig = (SourceConfig) o;
             if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) {
                 // We don't have to check the archive, since it's provided on the worker itself
@@ -743,31 +739,27 @@ public void validateField(String name, Object o) {
                 throw new IllegalArgumentException("Failed to extract source class from archive", e1);
             }
 
-            try (NarClassLoader clsLoader = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()),
-                    Collections.emptySet())) {
-                Class<?> typeArg = getSourceType(sourceClassName, clsLoader);
 
-                // Only one of serdeClassName or schemaType should be set
-                if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()
-                        && sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
-                    throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
-                }
+            Class<?> typeArg = getSourceType(sourceClassName, classLoader);
 
-                if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
-                    FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader, false);
-                }
-                if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
-                    FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader, false);
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException(e);
+            // Only one of serdeClassName or schemaType should be set
+            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()
+                    && sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
+                throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
+            }
+
+            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
+                FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, classLoader, false);
+            }
+            if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
+                FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, classLoader, false);
             }
         }
     }
 
     public static class SinkConfigValidator extends Validator {
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             SinkConfig sinkConfig = (SinkConfig) o;
             if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) {
                 // We don't have to check the archive, since it's provided on the worker itself
@@ -822,7 +814,7 @@ public void validateField(String name, Object o) {
                     });
                 }
             } catch (IOException e) {
-                throw new IllegalArgumentException(e);
+                throw new IllegalArgumentException(e.getMessage());
             }
         }
 
@@ -849,11 +841,11 @@ public void validateField(String name, Object o) {
 
     public static class FileValidator extends Validator {
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             if (o == null) {
                 return;
             }
-            new StringValidator().validateField(name, o);
+            new StringValidator().validateField(name, o, classLoader);
 
             String path = (String) o;
 
@@ -890,19 +882,18 @@ public static void validateField(String name, Class<?> type, Object o) {
         }
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o, ClassLoader classLoader) {
             validateField(name, this.type, o);
         }
     }
 
-    private static Class<?> loadClass(String className) throws ClassNotFoundException {
+    private static Class<?> loadClass(String className, ClassLoader classLoader) throws ClassNotFoundException {
         Class<?> objectClass;
         try {
             objectClass = Class.forName(className);
         } catch (ClassNotFoundException e) {
-            ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
-            if (clsLoader != null) {
-                objectClass = clsLoader.loadClass(className);
+            if (classLoader != null) {
+                objectClass = classLoader.loadClass(className);
             } else {
                 throw e;
             }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 6487dcfed2..aa80403009 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -27,6 +27,7 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -46,6 +47,7 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
 import org.apache.zookeeper.KeeperException.Code;
@@ -133,13 +135,19 @@ public static void uploadToBookeeper(Namespace dlogNamespace,
         }
     }
 
-    public static void validateFileUrl(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException {
+    public static ClassLoader validateFileUrl(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException {
         if (destPkgUrl.startsWith(FILE)) {
             URL url = new URL(destPkgUrl);
             File file = new File(url.toURI());
             if (!file.exists()) {
                 throw new IOException(destPkgUrl + " does not exists locally");
             }
+            try {
+                return Reflections.loadJar(file);
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException(
+                        "Corrupt User PackageFile " + file + " with error " + e.getMessage());
+            }
         } else if (destPkgUrl.startsWith("http")) {
             URL website = new URL(destPkgUrl);
             File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString());
@@ -150,6 +158,7 @@ public static void validateFileUrl(String destPkgUrl, String downloadPkgDir) thr
             if (tempFile.exists()) {
                 tempFile.delete();
             }
+            return null;
         } else {
             throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 4aa30166d2..53af197ad1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -20,8 +20,10 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.pulsar.functions.utils.Reflections.createInstance;
+import static org.apache.pulsar.functions.utils.Reflections.loadJar;
 import static org.apache.pulsar.functions.utils.Utils.FILE;
 import static org.apache.pulsar.functions.utils.Utils.HTTP;
 import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
@@ -38,6 +40,9 @@
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.file.CopyOption;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -66,6 +71,7 @@
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.join;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -82,7 +88,10 @@
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
+import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.Utils;
@@ -125,7 +134,8 @@ private boolean isWorkerServiceAvailable() {
 
     public Response registerFunction(final String tenant, final String namespace, final String functionName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson, final String clientRole) {
+            final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
+            final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -144,16 +154,28 @@ public Response registerFunction(final String tenant, final String namespace, fi
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+        if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+            log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s already exists", functionName))).build();
+        }
+
         FunctionDetails functionDetails;
         boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
+        File uploadedInputStreamAsFile = null;
+        if (uploadedInputStream != null) {
+            uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream);
+        }
         // validate parameters
         try {
             if (isPkgUrlProvided) {
                 functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
-                        functionDetailsJson);
+                        functionDetailsJson, functionConfigJson);
             } else {
-                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream,
-                        fileDetail, functionDetailsJson);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
+                        fileDetail, functionDetailsJson, functionConfigJson);
             }
         } catch (Exception e) {
             log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
@@ -161,14 +183,6 @@ public Response registerFunction(final String tenant, final String namespace, fi
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s already exists", functionName))).build();
-        }
-
         try {
             worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
         } catch (Exception e) {
@@ -195,12 +209,13 @@ public Response registerFunction(final String tenant, final String namespace, fi
 
         functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
         return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build())
-                : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
+                : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile);
     }
 
     public Response updateFunction(final String tenant, final String namespace, final String functionName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson, final String clientRole) {
+            final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
+            final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -219,16 +234,27 @@ public Response updateFunction(final String tenant, final String namespace, fina
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+        }
+
         FunctionDetails functionDetails;
         boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
+        File uploadedInputStreamAsFile = null;
+        if (uploadedInputStream != null) {
+            uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream);
+        }
         // validate parameters
         try {
             if (isPkgUrlProvided) {
                 functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
-                        functionDetailsJson);
+                        functionDetailsJson, functionConfigJson);
             } else {
-                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream,
-                        fileDetail, functionDetailsJson);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
+                        fileDetail, functionDetailsJson, functionConfigJson);
             }
         } catch (Exception e) {
             log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
@@ -236,13 +262,6 @@ public Response updateFunction(final String tenant, final String namespace, fina
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
-        }
-
         try {
             worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
         } catch (Exception e) {
@@ -270,7 +289,7 @@ public Response updateFunction(final String tenant, final String namespace, fina
 
         functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
         return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build())
-                : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
+                : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile);
     }
 
     public Response deregisterFunction(final String tenant, final String namespace, final String functionName,
@@ -556,10 +575,11 @@ public Response listFunctions(final String tenant, final String namespace) {
         return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
     }
 
-    private Response updateRequest(FunctionMetaData functionMetaData, InputStream uploadedInputStream) {
+    private Response updateRequest(FunctionMetaData functionMetaData, File uploadedInputStreamAsFile) {
         // Upload to bookkeeper
         try {
             log.info("Uploading function package to {}", functionMetaData.getPackageLocation());
+            FileInputStream uploadedInputStream = new FileInputStream(uploadedInputStreamAsFile);
 
             Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream,
                     functionMetaData.getPackageLocation().getPackagePath());
@@ -854,30 +874,41 @@ private void validateDeregisterRequestParams(String tenant, String namespace, St
     }
 
     private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName,
-            String functionPkgUrl, String functionDetailsJson)
+            String functionPkgUrl, String functionDetailsJson, String functionConfigJson)
             throws IllegalArgumentException, IOException, URISyntaxException {
         if (!isFunctionPackageUrlSupported(functionPkgUrl)) {
             throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
         }
-        Utils.validateFileUrl(functionPkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                functionDetailsJson, functionPkgUrl);
+                functionDetailsJson, functionConfigJson, functionPkgUrl, null);
         return functionDetails;
     }
 
     private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
-            InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionDetailsJson)
-            throws IllegalArgumentException {
+            File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
+            String functionConfigJson)
+            throws IllegalArgumentException, IOException, URISyntaxException {
 
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                functionDetailsJson, null);
-        if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStream == null || fileDetail == null)) {
+                functionDetailsJson, functionConfigJson, null, uploadedInputStreamAsFile);
+        if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) {
             throw new IllegalArgumentException("Function Package is not provided");
         }
 
         return functionDetails;
     }
 
+    private static File dumpToTmpFile(InputStream uploadedInputStream) {
+        try {
+            File tmpFile = File.createTempFile("functions", null);
+            tmpFile.deleteOnExit();
+            Files.copy(uploadedInputStream, tmpFile.toPath(), REPLACE_EXISTING);
+            return tmpFile;
+        } catch (IOException e) {
+            throw new RuntimeException("Cannot create a temporary file", e);
+        }
+    }
+
     private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key)
         throws IllegalArgumentException {
 
@@ -932,7 +963,7 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
     }
 
     private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
-            String functionDetailsJson, String functionPkgUrl) throws IllegalArgumentException {
+            String functionDetailsJson, String functionConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -943,63 +974,87 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
             throw new IllegalArgumentException("Function Name is not provided");
         }
 
-        if (functionDetailsJson == null) {
-            throw new IllegalArgumentException("FunctionDetails is not provided");
+        if (StringUtils.isEmpty(functionDetailsJson) && StringUtils.isEmpty(functionConfigJson)) {
+            throw new IllegalArgumentException("FunctionConfig is not provided");
         }
-        try {
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder);
-            if (isNotBlank(functionPkgUrl)) {
-                // validate function details by loading function-jar from local file-system
-                File jarWithFileUrl = functionPkgUrl.startsWith(FILE) ? (new File((new URL(functionPkgUrl)).toURI()))
-                        : null;
-                validateFunctionClassTypes(jarWithFileUrl, functionDetailsBuilder);
-                // set package-url if present
-                functionDetailsBuilder.setPackageUrl(functionPkgUrl);
-            }
-            FunctionDetails functionDetails = functionDetailsBuilder.build();
-
-            List<String> missingFields = new LinkedList<>();
-            if (functionDetails.getTenant() == null || functionDetails.getTenant().isEmpty()) {
-                missingFields.add("Tenant");
-            }
-            if (functionDetails.getNamespace() == null || functionDetails.getNamespace().isEmpty()) {
-                missingFields.add("Namespace");
-            }
-            if (functionDetails.getName() == null || functionDetails.getName().isEmpty()) {
-                missingFields.add("Name");
-            }
-            if (functionDetails.getClassName() == null || functionDetails.getClassName().isEmpty()) {
-                missingFields.add("ClassName");
-            }
-            // TODO in the future add more check here for functions and connectors
-            if (!functionDetails.getSource().isInitialized()) {
-                missingFields.add("Source");
-            }
-            // TODO in the future add more check here for functions and connectors
-            if (!functionDetails.getSink().isInitialized()) {
-                missingFields.add("Sink");
+        if (!StringUtils.isEmpty(functionDetailsJson) && !StringUtils.isEmpty(functionConfigJson)) {
+            throw new IllegalArgumentException("Only one of FunctionDetails or FunctionConfig should be provided");
+        }
+        if (!StringUtils.isEmpty(functionConfigJson)) {
+            FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class);
+            ClassLoader clsLoader = null;
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
             }
-            if (!missingFields.isEmpty()) {
-                String errorMessage = join(missingFields, ",");
-                throw new IllegalArgumentException(errorMessage + " is not provided");
+            if (functionConfig.getRuntime() == null) {
+                throw new IllegalArgumentException("Function Runtime no specified");
             }
-            if (functionDetails.getParallelism() <= 0) {
-                throw new IllegalArgumentException("Parallelism needs to be set to a positive number");
+            ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader);
+            return FunctionConfigUtils.convert(functionConfig, clsLoader);
+        }
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder);
+        if (isNotBlank(functionPkgUrl)) {
+            // set package-url if present
+            functionDetailsBuilder.setPackageUrl(functionPkgUrl);
+        }
+        ClassLoader clsLoader = null;
+        if (functionDetailsBuilder.getRuntime() == FunctionDetails.Runtime.JAVA) {
+            clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
+        }
+        validateFunctionClassTypes(clsLoader, functionDetailsBuilder);
+
+        FunctionDetails functionDetails = functionDetailsBuilder.build();
+
+        List<String> missingFields = new LinkedList<>();
+        if (functionDetails.getTenant() == null || functionDetails.getTenant().isEmpty()) {
+            missingFields.add("Tenant");
+        }
+        if (functionDetails.getNamespace() == null || functionDetails.getNamespace().isEmpty()) {
+            missingFields.add("Namespace");
+        }
+        if (functionDetails.getName() == null || functionDetails.getName().isEmpty()) {
+            missingFields.add("Name");
+        }
+        if (functionDetails.getClassName() == null || functionDetails.getClassName().isEmpty()) {
+            missingFields.add("ClassName");
+        }
+        // TODO in the future add more check here for functions and connectors
+        if (!functionDetails.getSource().isInitialized()) {
+            missingFields.add("Source");
+        }
+        // TODO in the future add more check here for functions and connectors
+        if (!functionDetails.getSink().isInitialized()) {
+            missingFields.add("Sink");
+        }
+        if (!missingFields.isEmpty()) {
+            String errorMessage = join(missingFields, ",");
+            throw new IllegalArgumentException(errorMessage + " is not provided");
+        }
+        if (functionDetails.getParallelism() <= 0) {
+            throw new IllegalArgumentException("Parallelism needs to be set to a positive number");
+        }
+        return functionDetails;
+    }
+
+    private ClassLoader extractClassLoader(String functionPkgUrl, File uploadedInputStreamAsFile) throws URISyntaxException, IOException {
+        if (isNotBlank(functionPkgUrl)) {
+            return Utils.validateFileUrl(functionPkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
+        } else if (uploadedInputStreamAsFile != null) {
+            try {
+                return loadJar(uploadedInputStreamAsFile);
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException("Corrupted Jar File", e);
             }
-            return functionDetails;
-        } catch (IllegalArgumentException ex) {
-            throw ex;
-        } catch (Exception ex) {
-            throw new IllegalArgumentException("Invalid FunctionDetails");
+        } else {
+            return null;
         }
     }
 
-    private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder functionDetailsBuilder)
-            throws MalformedURLException {
+    private void validateFunctionClassTypes(ClassLoader classLoader, FunctionDetails.Builder functionDetailsBuilder) {
 
-        // validate only if jar-file is provided
-        if (jarFile == null) {
+        // validate only if classLoader is provided
+        if (classLoader == null) {
             return;
         }
 
@@ -1007,10 +1062,6 @@ private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder fu
             throw new IllegalArgumentException("function class-name can't be empty");
         }
 
-        URL[] urls = new URL[1];
-        urls[0] = jarFile.toURI().toURL();
-        URLClassLoader classLoader = create(urls, FunctionClassLoaders.class.getClassLoader());
-
         // validate function class-type
         Object functionObject = createInstance(functionDetailsBuilder.getClassName(), classLoader);
         Class<?>[] typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
@@ -1073,7 +1124,7 @@ private void validateFunctionClassTypes(File jarFile, FunctionDetails.Builder fu
 
     }
 
-    private Class<?> getTypeArg(String className, Class<?> funClass, URLClassLoader classLoader)
+    private Class<?> getTypeArg(String className, Class<?> funClass, ClassLoader classLoader)
             throws ClassNotFoundException {
         Class<?> loadedClass = classLoader.loadClass(className);
         if (!funClass.isAssignableFrom(loadedClass)) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 789fbea67c..1e44a6072b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -56,10 +56,11 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("data") InputStream uploadedInputStream,
                                      final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                      final @FormDataParam("url") String functionPkgUrl,
-                                     final @FormDataParam("functionDetails") String functionDetailsJson) {
+                                     final @FormDataParam("functionDetails") String functionDetailsJson,
+                                     final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
 
     }
 
@@ -72,10 +73,11 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("data") InputStream uploadedInputStream,
                                    final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                    final @FormDataParam("url") String functionPkgUrl,
-                                   final @FormDataParam("functionDetails") String functionDetailsJson) {
+                                   final @FormDataParam("functionDetails") String functionDetailsJson,
+                                   final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
 
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 3d7620ed42..9ce87d016e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -37,16 +37,14 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
@@ -64,6 +62,7 @@
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -83,6 +82,7 @@
  */
 @PrepareForTest(Utils.class)
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
+@Slf4j
 public class FunctionApiV2ResourceTest {
 
     @ObjectFactory
@@ -92,7 +92,8 @@ public IObjectFactory getObjectFactory() {
 
     private static final class TestFunction implements Function<String, String> {
 
-        public String process(String input, Context context) throws Exception {
+        @Override
+        public String process(String input, Context context) {
             return input;
         }
     }
@@ -100,15 +101,15 @@ public String process(String input, Context context) throws Exception {
     public static final class TestSink implements Sink<byte[]> {
 
         @Override
-        public void close() throws Exception {
+        public void close() {
         }
 
         @Override
-        public void open(Map config, SinkContext sinkContext) throws Exception {
+        public void open(Map config, SinkContext sinkContext) {
         }
 
         @Override
-        public void write(Record<byte[]> record) throws Exception {
+        public void write(Record<byte[]> record) {
         }
     }
 
@@ -176,12 +177,13 @@ public void testRegisterFunctionMissingTenant() throws IOException {
             namespace,
             function,
             mockedInputStream,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Tenant");
+                "Tenant is not provided");
     }
 
     @Test
@@ -191,12 +193,13 @@ public void testRegisterFunctionMissingNamespace() throws IOException {
             null,
             function,
             mockedInputStream,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Namespace");
+                "Namespace is not provided");
     }
 
     @Test
@@ -206,12 +209,13 @@ public void testRegisterFunctionMissingFunctionName() throws IOException {
             namespace,
             null,
             mockedInputStream,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Name");
+                "Function Name is not provided");
     }
 
     @Test
@@ -221,12 +225,29 @@ public void testRegisterFunctionMissingPackage() throws IOException {
             namespace,
             function,
             null,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package");
+                "Function Package is not provided");
+    }
+
+    @Test
+    public void testRegisterFunctionMissingInputTopics() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                null,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "No input topic(s) specified for the function");
     }
 
     @Test
@@ -236,12 +257,13 @@ public void testRegisterFunctionMissingPackageDetails() throws IOException {
             namespace,
             function,
             mockedInputStream,
+            topicsToSerDeClassName,
             null,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package");
+                "Function Package is not provided");
     }
 
     @Test
@@ -251,12 +273,13 @@ public void testRegisterFunctionMissingClassName() throws IOException {
             namespace,
             function,
             mockedInputStream,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             null,
             parallelism,
-                "ClassName");
+                "Field 'className' cannot be null!");
     }
 
     @Test
@@ -266,12 +289,13 @@ public void testRegisterFunctionMissingParallelism() throws IOException {
                 namespace,
                 function,
                 mockedInputStream,
+                topicsToSerDeClassName,
                 mockedFormData,
                 outputTopic,
                 outputSerdeClassName,
                 className,
                 null,
-                "parallelism");
+                "Field 'parallelism' must be a Positive Number");
     }
 
     private void testRegisterFunctionMissingArguments(
@@ -279,38 +303,40 @@ private void testRegisterFunctionMissingArguments(
             String namespace,
             String function,
             InputStream inputStream,
+            Map<String, String> topicsToSerDeClassName,
             FormDataContentDisposition details,
             String outputTopic,
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String missingFieldName) throws IOException {
-        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+            String errorExpected) throws IOException {
+        FunctionConfig functionConfig = new FunctionConfig();
         if (tenant != null) {
-            functionDetailsBuilder.setTenant(tenant);
+            functionConfig.setTenant(tenant);
         }
         if (namespace != null) {
-            functionDetailsBuilder.setNamespace(namespace);
+            functionConfig.setNamespace(namespace);
         }
         if (function != null) {
-            functionDetailsBuilder.setName(function);
+            functionConfig.setName(function);
+        }
+        if (topicsToSerDeClassName != null) {
+            functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
         }
-        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
         if (outputTopic != null) {
-            sinkSpecBuilder.setTopic(outputTopic);
+            functionConfig.setOutput(outputTopic);
         }
         if (outputSerdeClassName != null) {
-            sinkSpecBuilder.setSerDeClassName(outputSerdeClassName);
+            functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         }
-        functionDetailsBuilder.setSink(sinkSpecBuilder);
         if (className != null) {
-            functionDetailsBuilder.setClassName(className);
+            functionConfig.setClassName(className);
         }
         if (parallelism != null) {
-            functionDetailsBuilder.setParallelism(parallelism);
+            functionConfig.setParallelism(parallelism);
         }
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
 
-        FunctionDetails functionDetails = functionDetailsBuilder.build();
         Response response = resource.registerFunction(
                 tenant,
                 namespace,
@@ -318,28 +344,25 @@ private void testRegisterFunctionMissingArguments(
                 inputStream,
                 details,
                 null,
-                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+                null,
+                new Gson().toJson(functionConfig),
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        if (missingFieldName.equals("parallelism")) {
-            Assert.assertEquals(new ErrorData("Parallelism needs to be set to a positive number").reason, ((ErrorData) response.getEntity()).reason);
-        } else {
-            Assert.assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
-        }
+        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason);
     }
 
-    private Response registerDefaultFunction() throws IOException {
-        SinkSpec sinkSpec = SinkSpec.newBuilder()
-                .setTopic(outputTopic)
-                .setSerDeClassName(outputSerdeClassName).build();
-        FunctionDetails functionDetails = FunctionDetails.newBuilder()
-                .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setSink(sinkSpec)
-                .setClassName(className)
-                .setParallelism(parallelism)
-                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
-                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+    private Response registerDefaultFunction() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         return resource.registerFunction(
             tenant,
             namespace,
@@ -347,7 +370,8 @@ private Response registerDefaultFunction() throws IOException {
             mockedInputStream,
             mockedFormData,
             null,
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null,
+            new Gson().toJson(functionConfig),
             null);
     }
 
@@ -452,12 +476,13 @@ public void testUpdateFunctionMissingTenant() throws IOException {
             namespace,
             function,
             mockedInputStream,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Tenant");
+                "Tenant is not provided");
     }
 
     @Test
@@ -467,12 +492,13 @@ public void testUpdateFunctionMissingNamespace() throws IOException {
             null,
             function,
             mockedInputStream,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Namespace");
+                "Namespace is not provided");
     }
 
     @Test
@@ -482,12 +508,13 @@ public void testUpdateFunctionMissingFunctionName() throws IOException {
             namespace,
             null,
             mockedInputStream,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Name");
+                "Function Name is not provided");
     }
 
     @Test
@@ -497,12 +524,29 @@ public void testUpdateFunctionMissingPackage() throws IOException {
             namespace,
             function,
             null,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package");
+                "Function Package is not provided");
+    }
+
+    @Test
+    public void testUpdateFunctionMissingInputTopic() throws IOException {
+        testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                null,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "No input topic(s) specified for the function");
     }
 
     @Test
@@ -512,12 +556,13 @@ public void testUpdateFunctionMissingPackageDetails() throws IOException {
             namespace,
             function,
             mockedInputStream,
+            topicsToSerDeClassName,
             null,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package");
+                "Function Package is not provided");
     }
 
     @Test
@@ -527,12 +572,13 @@ public void testUpdateFunctionMissingClassName() throws IOException {
             namespace,
             function,
             mockedInputStream,
+            topicsToSerDeClassName,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             null,
             parallelism,
-                "ClassName");
+                "Field 'className' cannot be null!");
     }
     @Test
     public void testUpdateFunctionMissingParallelism() throws IOException {
@@ -541,12 +587,13 @@ public void testUpdateFunctionMissingParallelism() throws IOException {
                 namespace,
                 function,
                 mockedInputStream,
+                topicsToSerDeClassName,
                 mockedFormData,
                 outputTopic,
                 outputSerdeClassName,
                 className,
                 null,
-                "parallelism");
+                "Field 'parallelism' must be a Positive Number");
     }
 
     private void testUpdateFunctionMissingArguments(
@@ -554,38 +601,42 @@ private void testUpdateFunctionMissingArguments(
             String namespace,
             String function,
             InputStream inputStream,
+            Map<String, String> topicsToSerDeClassName,
             FormDataContentDisposition details,
             String outputTopic,
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String missingFieldName) throws IOException {
-        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+            String expectedError) throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+        FunctionConfig functionConfig = new FunctionConfig();
         if (tenant != null) {
-            functionDetailsBuilder.setTenant(tenant);
+            functionConfig.setTenant(tenant);
         }
         if (namespace != null) {
-            functionDetailsBuilder.setNamespace(namespace);
+            functionConfig.setNamespace(namespace);
         }
         if (function != null) {
-            functionDetailsBuilder.setName(function);
+            functionConfig.setName(function);
+        }
+        if (topicsToSerDeClassName != null) {
+            functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
         }
-        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
         if (outputTopic != null) {
-            sinkSpecBuilder.setTopic(outputTopic);
+            functionConfig.setOutput(outputTopic);
         }
         if (outputSerdeClassName != null) {
-            sinkSpecBuilder.setSerDeClassName(outputSerdeClassName);
+            functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         }
-        functionDetailsBuilder.setSink(sinkSpecBuilder);
         if (className != null) {
-            functionDetailsBuilder.setClassName(className);
+            functionConfig.setClassName(className);
         }
         if (parallelism != null) {
-            functionDetailsBuilder.setParallelism(parallelism);
+            functionConfig.setParallelism(parallelism);
         }
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
 
-        FunctionDetails functionDetails = functionDetailsBuilder.build();
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -593,28 +644,26 @@ private void testUpdateFunctionMissingArguments(
             inputStream,
             details,
             null,
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null,
+            new Gson().toJson(functionConfig),
             null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        if (missingFieldName.equals("parallelism")) {
-            Assert.assertEquals(new ErrorData("Parallelism needs to be set to a positive number").reason, ((ErrorData) response.getEntity()).reason);
-        } else {
-            Assert.assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
-        }
+        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason);
     }
 
     private Response updateDefaultFunction() throws IOException {
-        SinkSpec sinkSpec = SinkSpec.newBuilder()
-                .setTopic(outputTopic)
-                .setSerDeClassName(outputSerdeClassName).build();
-        FunctionDetails functionDetails = FunctionDetails.newBuilder()
-                .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setSink(sinkSpec)
-                .setClassName(className)
-                .setParallelism(parallelism)
-                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
-                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+
         return resource.updateFunction(
             tenant,
             namespace,
@@ -622,7 +671,8 @@ private Response updateDefaultFunction() throws IOException {
             mockedInputStream,
             mockedFormData,
             null,
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null,
+            new Gson().toJson(functionConfig),
             null);
     }
 
@@ -679,16 +729,16 @@ public void testUpdateFunctionWithUrl() throws IOException {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String filePackageUrl = "file://" + fileLocation;
 
-        SinkSpec sinkSpec = SinkSpec.newBuilder()
-                .setTopic(outputTopic)
-                .setSerDeClassName(outputSerdeClassName).build();
-        FunctionDetails functionDetails = FunctionDetails.newBuilder()
-                .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setSink(sinkSpec)
-                .setClassName(className)
-                .setParallelism(parallelism)
-                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
-                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
         RequestResult rr = new RequestResult()
@@ -704,7 +754,8 @@ public void testUpdateFunctionWithUrl() throws IOException {
             null,
             null,
             filePackageUrl,
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null,
+            new Gson().toJson(functionConfig),
             null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -1046,19 +1097,24 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException {
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        SinkSpec sinkSpec = SinkSpec.newBuilder().setClassName(TestSink.class.getName()).setTopic(outputTopic)
-                .setSerDeClassName(outputSerdeClassName).build();
-        FunctionDetails functionDetails = FunctionDetails
-                .newBuilder().setTenant(tenant).setNamespace(namespace).setName(function).setSink(sinkSpec)
-                .setClassName(className).setParallelism(parallelism).setSource(SourceSpec.newBuilder()
-                        .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
-                .build();
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
-                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
+                null, new Gson().toJson(functionConfig), null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
+    /*
+    TODO:- Needs to be moved to a source/sink specific unittest
     @Test
     public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException {
         Configurator.setRootLevel(Level.DEBUG);
@@ -1071,6 +1127,16 @@ public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         SinkSpec sinkSpec = SinkSpec.newBuilder().setClassName(className).setTopic(outputTopic)
                 .setSerDeClassName(outputSerdeClassName).build();
         FunctionDetails functionDetails = FunctionDetails
@@ -1079,8 +1145,9 @@ public void testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException
                         .setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
                 .build();
         Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
-                org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
+                null, new Gson().toJson(functionConfig), null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(response.getStatus(), Status.BAD_REQUEST.getStatusCode());
     }
+    */
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services