You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/22 20:01:41 UTC

[GitHub] srkukarni closed pull request #2813: Removed Validation Annotations and do manual Validation

srkukarni closed pull request #2813: Removed Validation Annotations and do manual Validation
URL: https://github.com/apache/pulsar/pull/2813
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 6d7dbd2f94..1a1a95b722 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -34,8 +34,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -47,11 +45,8 @@
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -229,7 +224,7 @@ protected static FunctionConfig createFunctionConfig(String jarFile, String tena
 
         File file = new File(jarFile);
         try {
-            Reflections.loadJar(file);
+            Utils.loadJar(file);
         } catch (MalformedURLException e) {
             throw new RuntimeException("Failed to load user jar " + file, e);
         }
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 0c512b82a5..bf4af8ab72 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -42,6 +42,7 @@
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -95,6 +96,7 @@ public IObjectFactory getObjectFactory() {
     }
 
     private static final String TEST_NAME = "test_name";
+    private static final String JAR_NAME = CmdFunctionsTest.class.getClassLoader().getResource("dummyexamples.jar").getFile();
 
     private PulsarAdmin admin;
     private Functions functions;
@@ -201,7 +203,7 @@ public void testCreateFunction() throws Exception {
             "--name", fnName,
             "--inputs", inputTopicName,
             "--output", outputTopicName,
-            "--jar", "SomeJar.jar",
+            "--jar", JAR_NAME,
             "--auto-ack", "false",
             "--tenant", "sample",
             "--namespace", "ns1",
@@ -273,7 +275,7 @@ public void stopFunctionInstances() throws Exception {
 
         verify(functions, times(1)).stopFunction(tenant, namespace, fnName);
     }
-    
+
     @Test
     public void testCreateFunctionWithHttpUrl() throws Exception {
         String fnName = TEST_NAME + "-function";
@@ -300,7 +302,7 @@ public void testCreateFunctionWithHttpUrl() throws Exception {
         consoleOutputCapturer.stop();
         String output = consoleOutputCapturer.getStderr();
 
-        assertTrue(output.contains("Failed to download jar"));
+        assertTrue(output.contains("Corrupted Jar File"));
         assertEquals(fnName, creater.getFunctionName());
         assertEquals(inputTopicName, creater.getInputs());
         assertEquals(outputTopicName, creater.getOutput());
@@ -320,14 +322,14 @@ public void testGetFunctionStatus() throws Exception {
 
         verify(functions, times(1)).getFunctionStatus(tenant, namespace, fnName, instanceId);
     }
-    
+
     @Test
     public void testCreateFunctionWithFileUrl() throws Exception {
         String fnName = TEST_NAME + "-function";
         String inputTopicName = TEST_NAME + "-input-topic";
         String outputTopicName = TEST_NAME + "-output-topic";
 
-        final String url = "file:/usr/temp/myfile.jar";
+        final String url = "file:" + JAR_NAME;
         cmd.run(new String[] {
             "create",
             "--name", fnName,
@@ -371,7 +373,7 @@ public void testCreateSink() throws Exception {
         consoleOutputCapturer.stop();
         String output = consoleOutputCapturer.getStderr();
 
-        assertTrue(output.contains("Failed to download archive"));
+        assertTrue(output.contains("Corrupt User PackageFile " + url));
         assertEquals(url, creater.archive);
     }
 
@@ -389,6 +391,7 @@ public void testCreateSource() throws Exception {
             "--archive", url,
             "--tenant", "sample",
             "--namespace", "ns1",
+            "--destination-topic-name", "input",
         });
 
         CreateSource creater = cmdSources.getCreateSource();
@@ -396,7 +399,7 @@ public void testCreateSource() throws Exception {
         consoleOutputCapturer.stop();
         String output = consoleOutputCapturer.getStderr();
 
-        assertTrue(output.contains("Failed to download archive"));
+        assertTrue(output.contains("Corrupt User PackageFile " + url));
         assertEquals(url, creater.archive);
     }
 
@@ -410,7 +413,7 @@ public void testCreateFunctionWithTopicPatterns() throws Exception {
             "--name", fnName,
             "--topicsPattern", topicPatterns,
             "--output", outputTopicName,
-            "--jar", "SomeJar.jar",
+            "--jar", JAR_NAME,
             "--tenant", "sample",
             "--namespace", "ns1",
             "--className", DummyFunction.class.getName(),
@@ -435,7 +438,7 @@ public void testCreateWithoutTenant() throws Exception {
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
         });
@@ -455,7 +458,7 @@ public void testCreateWithoutNamespace() throws Exception {
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--className", DummyFunction.class.getName(),
         });
 
@@ -479,7 +482,7 @@ public void testCreateUsingFullyQualifiedFunctionName() throws Exception {
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
                 "--fqfn", fqfn,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--className", DummyFunction.class.getName(),
         });
 
@@ -498,7 +501,7 @@ public void testCreateWithoutFunctionName() throws Exception {
                 "create",
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -515,7 +518,7 @@ public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
         cmd.run(new String[] {
                 "create",
                 "--inputs", inputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -527,7 +530,7 @@ public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
 
     }
 
-    
+
     @Test
     public void testCreateWithoutOutputTopic() {
 
@@ -538,7 +541,7 @@ public void testCreateWithoutOutputTopic() {
         cmd.run(new String[] {
                 "create",
                 "--inputs", inputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -603,7 +606,7 @@ public void testUpdateFunction() throws Exception {
             "--name", fnName,
             "--inputs", inputTopicName,
             "--output", outputTopicName,
-            "--jar", "SomeJar.jar",
+            "--jar", JAR_NAME,
             "--tenant", "sample",
             "--namespace", "ns1",
             "--className", DummyFunction.class.getName(),
@@ -735,7 +738,7 @@ public void TestCreateFunctionParallelism() throws Exception{
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -746,14 +749,14 @@ public void TestCreateFunctionParallelism() throws Exception{
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
                 "--parallelism", "-1"
         };
 
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Field 'parallelism' must be a Positive Number");
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Function parallelism should positive number");
 
     }
 
@@ -764,7 +767,7 @@ public void TestCreateTopicName() throws Exception {
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -775,13 +778,13 @@ public void TestCreateTopicName() throws Exception {
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", wrongOutputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
         };
 
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "The topic name " + wrongOutputTopicName + " is invalid for field 'output'");
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Output topic " + wrongOutputTopicName + " is invalid");
     }
 
     @Test
@@ -791,7 +794,7 @@ public void TestCreateClassName() throws Exception {
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -802,13 +805,13 @@ public void TestCreateClassName() throws Exception {
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", cannotLoadClass,
         };
 
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Cannot find/load class " + cannotLoadClass);
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "User class must be in class path");
     }
 
     @Test
@@ -818,7 +821,7 @@ public void TestCreateSameInOutTopic() throws Exception {
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", outputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -828,7 +831,7 @@ public void TestCreateSameInOutTopic() throws Exception {
                 "--name", fnName,
                 "--inputs", inputTopicName,
                 "--output", inputTopicName,
-                "--jar", "SomeJar.jar",
+                "--jar", JAR_NAME,
                 "--tenant", "sample",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
diff --git a/pulsar-client-tools-test/src/test/resources/dummyexamples.jar b/pulsar-client-tools-test/src/test/resources/dummyexamples.jar
new file mode 100644
index 0000000000..0407080d1d
Binary files /dev/null and b/pulsar-client-tools-test/src/test/resources/dummyexamples.jar differ
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 7e44b440fd..dbc48cc153 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -24,11 +24,8 @@
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -46,7 +43,6 @@
 
 import java.io.File;
 import java.lang.reflect.Type;
-import java.net.MalformedURLException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -69,25 +65,12 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.Function.Resources;
-import org.apache.pulsar.functions.proto.Function.RetryDetails;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.*;
-import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.validation.ConfigValidation;
-import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator;
-import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
 import org.apache.pulsar.functions.windowing.WindowUtils;
 
 @Slf4j
@@ -505,48 +488,19 @@ protected void validateFunctionConfigs(FunctionConfig functionConfig) {
                         + " be specified for the function. Please specify one.");
             }
 
-            boolean isJarPathUrl = isNotBlank(functionConfig.getJar()) && Utils.isFunctionPackageUrlSupported(functionConfig.getJar());
-            String jarFilePath = null;
-            if (isJarPathUrl) {
-                if (functionConfig.getJar().startsWith(Utils.HTTP)) {
-                    // download jar file if url is http or file is downloadable
-                    File tempPkgFile = null;
-                    try {
-                        tempPkgFile = downloadFromHttpUrl(functionConfig.getJar(), functionConfig.getName());
-                        jarFilePath = tempPkgFile.getAbsolutePath();
-                    } catch (Exception e) {
-                        if (tempPkgFile != null) {
-                            tempPkgFile.deleteOnExit();
-                        }
-                        throw new ParameterException("Failed to download jar from " + functionConfig.getJar()
-                                + ", due to =" + e.getMessage());
-                    }
-                }
-            } else {
-                if (!fileExists(userCodeFile)) {
-                    throw new ParameterException("File " + userCodeFile + " does not exist");
-                }
-                jarFilePath = userCodeFile;
+            if (!isBlank(functionConfig.getJar()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getJar()) &&
+                    !new File(functionConfig.getJar()).exists()) {
+                throw new ParameterException("The specified jar file does not exist");
             }
-
-            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-
-                if (jarFilePath != null) {
-                    File file = new File(jarFilePath);
-                    try {
-                        classLoader = Reflections.loadJar(file);
-                    } catch (MalformedURLException e) {
-                        throw new ParameterException(
-                                "Failed to load user jar " + file + " with error " + e.getMessage());
-                    }
-                    (new ImplementsClassesValidator(Function.class, java.util.function.Function.class))
-                            .validateField("className", functionConfig.getClassName(), classLoader);
-                }
+            if (!isBlank(functionConfig.getPy()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) &&
+                    !new File(functionConfig.getPy()).exists()) {
+                throw new ParameterException("The specified jar file does not exist");
             }
 
             try {
                 // Need to load jar and set context class loader before calling
-                ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), classLoader);
+                String functionPkgUrl = Utils.isFunctionPackageUrlSupported(userCodeFile) ? userCodeFile : null;
+                classLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, null);
             } catch (Exception e) {
                 throw new IllegalArgumentException(e.getMessage());
             }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 37d2d9bcdb..b8ba510574 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -22,8 +22,7 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
+import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -37,6 +36,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -56,7 +56,6 @@
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
-import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO sinks (egress data from Pulsar)")
@@ -439,55 +438,18 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
                 throw new ParameterException("Sink archive not specfied");
             }
 
-            boolean isConnectorBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN);
-            boolean isArchivePathUrl = Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive());
-
-            String archivePath = null;
-            if (isArchivePathUrl) {
-                // download jar file if url is http
-                if(sinkConfig.getArchive().startsWith(Utils.HTTP)) {
-                    File tempPkgFile = null;
-                    try {
-                        tempPkgFile = downloadFromHttpUrl(sinkConfig.getArchive(), sinkConfig.getName());
-                        archivePath = tempPkgFile.getAbsolutePath();
-                    } catch(Exception e) {
-                        if(tempPkgFile!=null ) {
-                            tempPkgFile.deleteOnExit();
-                        }
-                        throw new ParameterException("Failed to download archive from " + sinkConfig.getArchive()
-                                + ", due to =" + e.getMessage());
-                    }
-                }
-            } else if (isConnectorBuiltin) {
-                // Ignore local checks when submitting built-in connector
-                archivePath = null;
-            } else {
-                archivePath = sinkConfig.getArchive();
-            }
-
-            // if jar file is present locally then load jar and validate SinkClass in it
-            if (archivePath != null) {
-                if (!fileExists(archivePath)) {
-                    throw new ParameterException("Archive file " + archivePath + " does not exist");
-                }
-
-                try {
-                    ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath);
-                    log.info("Connector: {}", connector);
-                } catch (IOException e) {
-                    throw new ParameterException("Connector from " + archivePath + " has error: " + e.getMessage());
-                }
-
-                try {
-                    classLoader = NarClassLoader.getFromArchive(new File(archivePath), Collections.emptySet());
-                } catch (IOException e) {
-                    throw new IllegalArgumentException(e);
+            if (!Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) &&
+                    !sinkConfig.getArchive().startsWith(BUILTIN)) {
+                if (!new File(sinkConfig.getArchive()).exists()) {
+                    throw new IllegalArgumentException(String.format("Sink Archive file %s does not exist", sinkConfig.getArchive()));
                 }
             }
 
             try {
                 // Need to load jar and set context class loader before calling
-                ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), classLoader);
+                String sourcePkgUrl = Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) ? sinkConfig.getArchive() : null;
+                Path archivePath = (Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) || sinkConfig.getArchive().startsWith(BUILTIN)) ? null : new File(sinkConfig.getArchive()).toPath();
+                classLoader = SinkConfigUtils.validate(sinkConfig, archivePath, sourcePkgUrl, null);
             } catch (Exception e) {
                 throw new ParameterException(e.getMessage());
             }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 5a1e9b3817..f295030fe8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -22,8 +22,7 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
+import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -37,8 +36,8 @@
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -62,7 +61,6 @@
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
-import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO Sources (ingress data into Pulsar)")
@@ -397,58 +395,18 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) {
             if (isBlank(sourceConfig.getArchive())) {
                 throw new ParameterException("Source archive not specfied");
             }
-
-            boolean isConnectorBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN);
-            boolean isArchivePathUrl = Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive());
-
-            String archivePath = null;
-            if (isArchivePathUrl) {
-                // download archive file if url is http
-                if(sourceConfig.getArchive().startsWith(Utils.HTTP)) {
-                    File tempPkgFile = null;
-                    try {
-                        tempPkgFile = downloadFromHttpUrl(sourceConfig.getArchive(), sourceConfig.getName());
-                        archivePath = tempPkgFile.getAbsolutePath();
-                    } catch(Exception e) {
-                        if(tempPkgFile!=null ) {
-                            tempPkgFile.deleteOnExit();
-                        }
-                        throw new ParameterException("Failed to download archive from " + sourceConfig.getArchive()
-                                + ", due to =" + e.getMessage());
-                    }
-                }
-            } else if (isConnectorBuiltin) {
-                // Ignore local checks when submitting built-in connector
-                archivePath = null;
-            } else {
-                archivePath = sourceConfig.getArchive();
-            }
-
-
-            // if jar file is present locally then load jar and validate SinkClass in it
-            if (archivePath != null) {
-                if (!fileExists(archivePath)) {
-                    throw new ParameterException("Archive file " + archivePath + " does not exist");
-                }
-
-                try {
-                    ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath);
-                    log.info("Connector: {}", connector);
-                } catch (IOException e) {
-                    throw new ParameterException("Connector from " + archivePath + " has error: " + e.getMessage());
-                }
-
-                try {
-                    classLoader = NarClassLoader.getFromArchive(new File(archivePath),
-                            Collections.emptySet());
-                } catch (IOException e) {
-                    throw new IllegalArgumentException(e);
+            if (!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) &&
+                !sourceConfig.getArchive().startsWith(BUILTIN)) {
+                if (!new File(sourceConfig.getArchive()).exists()) {
+                    throw new IllegalArgumentException(String.format("Source Archive %s does not exist", sourceConfig.getArchive()));
                 }
             }
 
             try {
              // Need to load jar and set context class loader before calling
-                ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), classLoader);
+                String sourcePkgUrl = Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) ? sourceConfig.getArchive() : null;
+                Path archivePath = (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) || sourceConfig.getArchive().startsWith(BUILTIN)) ? null : new File(sourceConfig.getArchive()).toPath();
+                classLoader = SourceConfigUtils.validate(sourceConfig, archivePath, sourcePkgUrl, null);
             } catch (Exception e) {
                 throw new ParameterException(e.getMessage());
             }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index b52bc17498..a050997894 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -40,10 +40,7 @@
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.Sink;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Resources;
-import org.apache.pulsar.functions.utils.SinkConfig;
+import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
@@ -124,7 +121,7 @@ public void setup() throws Exception {
         }
         JAR_FILE_PATH = file.getFile();
         WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile();
-        Thread.currentThread().setContextClassLoader(Reflections.loadJar(new File(JAR_FILE_PATH)));
+        Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
     }
 
     public SinkConfig getSinkConfig() {
@@ -211,7 +208,7 @@ public void testMissingNamespace() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'name' cannot be null!")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink name cannot be null")
     public void testMissingName() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
         sinkConfig.setName(null);
@@ -368,7 +365,7 @@ public void testMissingParallelism() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'parallelism' must be a Positive Number")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
     public void testNegativeParallelism() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
         sinkConfig.setParallelism(-1);
@@ -390,7 +387,7 @@ public void testNegativeParallelism() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'parallelism' must be a Positive Number")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
     public void testZeroParallelism() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
         sinkConfig.setParallelism(0);
@@ -456,7 +453,7 @@ public void testInvalidJarWithNoSource() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Archive file /tmp/foo.jar" +
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Sink Archive file /tmp/foo.jar" +
             " does not exist")
     public void testInvalidJar() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
@@ -673,7 +670,7 @@ public void testCmdSinkConfigFileMissingNamespace() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'name' cannot be null!")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink name cannot be null")
     public void testCmdSinkConfigFileMissingName() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
         testSinkConfig.setName(null);
@@ -722,7 +719,7 @@ public void testCmdSinkConfigFileMissingConfig() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'parallelism' must be a Positive Number")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
     public void testCmdSinkConfigFileZeroParallelism() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
         testSinkConfig.setParallelism(0);
@@ -732,7 +729,7 @@ public void testCmdSinkConfigFileZeroParallelism() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'parallelism' must be a Positive Number")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
     public void testCmdSinkConfigFileNegativeParallelism() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
         testSinkConfig.setParallelism(-1);
@@ -772,7 +769,7 @@ public void testCmdSinkConfigFileMissingJar() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Archive file /tmp/foo.jar does not exist")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Sink Archive file /tmp/foo.jar does not exist")
     public void testCmdSinkConfigFileInvalidJar() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
         testSinkConfig.setArchive("/tmp/foo.jar");
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 4a3b3cc2fa..318dbe65cf 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -38,10 +38,7 @@
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.Source;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Resources;
-import org.apache.pulsar.functions.utils.SourceConfig;
+import org.apache.pulsar.functions.utils.*;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -104,7 +101,7 @@ public void setup() throws Exception {
         PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
         JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
         WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile();
-        Thread.currentThread().setContextClassLoader(Reflections.loadJar(new File(JAR_FILE_PATH)));
+        Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
     }
 
     public SourceConfig getSourceConfig() {
@@ -179,7 +176,7 @@ public void testMissingNamespace() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'name' cannot be null!")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source name cannot be null")
     public void testMissingName() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setName(null);
@@ -198,7 +195,7 @@ public void testMissingName() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'topicName' cannot be null!")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Topic name cannot be null")
     public void testMissingTopicName() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setTopicName(null);
@@ -278,7 +275,7 @@ public void testMissingParallelism() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'parallelism' must be a Positive Number")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
     public void testNegativeParallelism() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setParallelism(-1);
@@ -297,7 +294,7 @@ public void testNegativeParallelism() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'parallelism' must be a Positive Number")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
     public void testZeroParallelism() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         sourceConfig.setParallelism(0);
@@ -335,7 +332,7 @@ public void testMissingArchive() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Archive file /tmp/foo.jar does not exist")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Source Archive /tmp/foo.jar does not exist")
     public void testInvalidJar() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
         String fakeJar = "/tmp/foo.jar";
@@ -550,7 +547,7 @@ public void testCmdSourceConfigFileMissingNamespace() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'name' cannot be null!")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source name cannot be null")
     public void testCmdSourceConfigFileMissingName() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setName(null);
@@ -560,7 +557,7 @@ public void testCmdSourceConfigFileMissingName() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'topicName' cannot be null!")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Topic name cannot be null")
     public void testCmdSourceConfigFileMissingTopicName() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setTopicName(null);
@@ -590,7 +587,7 @@ public void testCmdSourceConfigFileMissingConfig() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'parallelism' must be a Positive Number")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
     public void testCmdSourceConfigFileZeroParallelism() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setParallelism(0);
@@ -600,7 +597,7 @@ public void testCmdSourceConfigFileZeroParallelism() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Field 'parallelism' must be a Positive Number")
+    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
     public void testCmdSourceConfigFileNegativeParallelism() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setParallelism(-1);
@@ -640,7 +637,7 @@ public void testCmdSourceConfigFileMissingJar() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Archive file /tmp/foo.jar does not exist")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Source Archive /tmp/foo.jar does not exist")
     public void testCmdSourceConfigFileInvalidJar() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
         testSourceConfig.setArchive("/tmp/foo.jar");
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index c6262d3c33..73c909be23 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -35,7 +35,6 @@
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.WindowConfig;
-import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
 import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
 import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
 import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
@@ -100,7 +99,6 @@ private WindowConfig getWindowConfigs(Context context) {
 
 
         WindowUtils.inferDefaultConfigs(windowConfig);
-        ValidatorImpls.WindowConfigValidator.validateWindowConfig(windowConfig);
         return windowConfig;
     }
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index da066cc0af..d12a7cf2b8 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -218,12 +218,6 @@ public void testExecuteWithLateTupleStream() throws Exception {
     @Test
     public void testSettingSlidingCountWindow() throws Exception {
         final Object[][] args = new Object[][]{
-                {-1, 10},
-                {10, -1},
-                {0, 10},
-                {10, 0},
-                {0, 0},
-                {-1, -1},
                 {5, 10},
                 {1, 1},
                 {10, 5},
@@ -231,11 +225,7 @@ public void testSettingSlidingCountWindow() throws Exception {
                 {100, 100},
                 {200, 100},
                 {500, 100},
-                {null, null},
-                {null, 1},
                 {1, null},
-                {null, -1},
-                {-1, null}
         };
 
         for (Object[] arg : args) {
@@ -311,12 +301,6 @@ public void testSettingSlidingCountWindow() throws Exception {
     @Test
     public void testSettingSlidingTimeWindow() throws Exception {
         final Object[][] args = new Object[][]{
-                {-1L, 10L},
-                {10L, -1L},
-                {0L, 10L},
-                {10L, 0L},
-                {0L, 0L},
-                {-1L, -1L},
                 {5L, 10L},
                 {1L, 1L},
                 {10L, 5L},
@@ -324,11 +308,7 @@ public void testSettingSlidingTimeWindow() throws Exception {
                 {100L, 100L},
                 {200L, 100L},
                 {500L, 100L},
-                {null, null},
-                {null, 1L},
                 {1L, null},
-                {null, -1L},
-                {-1L, null}
         };
 
         for (Object[] arg : args) {
@@ -400,7 +380,7 @@ public void testSettingSlidingTimeWindow() throws Exception {
 
     @Test
     public void testSettingTumblingCountWindow() throws Exception {
-        final Object[] args = new Object[]{-1, 0, 1, 2, 5, 10, null};
+        final Object[] args = new Object[]{1, 2, 5, 10};
 
         for (Object arg : args) {
             Object arg0 = arg;
@@ -454,7 +434,7 @@ public void testSettingTumblingCountWindow() throws Exception {
 
     @Test
     public void testSettingTumblingTimeWindow() throws Exception {
-        final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+        final Object[] args = new Object[]{1L, 2L, 5L, 10L};
         for (Object arg : args) {
             Object arg0 = arg;
             try {
@@ -507,7 +487,7 @@ public void testSettingTumblingTimeWindow() throws Exception {
 
     @Test
     public void testSettingLagTime() throws Exception {
-        final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+        final Object[] args = new Object[]{0L, 1L, 2L, 5L, 10L, null};
         for (Object arg : args) {
             Object arg0 = arg;
             try {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index e173bc9b7b..e206440a11 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -29,26 +29,12 @@
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.functions.utils.validation.ConfigValidation;
-
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidFunctionConfig;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidWindowConfig;
-import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
 
 @Getter
 @Setter
 @Data
 @EqualsAndHashCode
 @ToString
-@isValidFunctionConfig
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class FunctionConfig {
 
@@ -63,24 +49,13 @@
         PYTHON
     }
 
-
-    @NotNull
     private String tenant;
-    @NotNull
     private String namespace;
-    @NotNull
     private String name;
-    @NotNull
     private String className;
-    @isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
     private Collection<String> inputs;
-    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
-            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }, targetRuntime = ConfigValidation.Runtime.JAVA)
-    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON)
     private Map<String, String> customSerdeInputs;
-    @isValidTopicName
     private String topicsPattern;
-    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON)
     private Map<String, String> customSchemaInputs;
 
     /**
@@ -88,7 +63,6 @@
      */
     private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
 
-    @isValidTopicName
     private String output;
 
     /**
@@ -97,9 +71,7 @@
      */
     private String outputSchemaType;
 
-    @ConfigValidationAnnotations.isValidSerde
     private String outputSerdeClassName;
-    @isValidTopicName
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
     private boolean retainOrdering;
@@ -109,17 +81,11 @@
     private int maxMessageRetries = -1;
     private String deadLetterTopic;
     private String subName;
-    @isPositiveNumber
     private int parallelism = 1;
-    @isValidResources
     private Resources resources;
     private String fqfn;
-    @isValidWindowConfig
     private WindowConfig windowConfig;
-    @isPositiveNumber
     private Long timeoutMs;
-    @isFileExists
     private String jar;
-    @isFileExists
     private String py;
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 11b623d358..fa6621453d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -22,19 +22,22 @@
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
+import java.io.File;
 import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.Map;
+import java.net.MalformedURLException;
+import java.util.*;
 
 import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
+import static org.apache.pulsar.functions.utils.Utils.loadJar;
 
 public class FunctionConfigUtils {
-
     public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
             throws IllegalArgumentException {
 
@@ -277,4 +280,237 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
 
         return functionConfig;
     }
+
+    private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+        Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig, clsLoader);
+        // inputs use default schema, so there is no check needed there
+
+        // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
+        // implements SerDe class
+        if (functionConfig.getCustomSerdeInputs() != null) {
+            functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
+                ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], clsLoader, true);
+            });
+        }
+
+        // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
+        // implements SerDe class
+        if (functionConfig.getCustomSchemaInputs() != null) {
+            functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> {
+                ValidatorUtils.validateSchema(schemaType, typeArgs[0], clsLoader, true);
+            });
+        }
+
+        // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
+        // implements Schema or SerDe classes
+
+        if (functionConfig.getInputSpecs() != null) {
+            functionConfig.getInputSpecs().forEach((topicName, conf) -> {
+                // Need to make sure that one and only one of schema/serde is set
+                if (!isEmpty(conf.getSchemaType()) && !isEmpty(conf.getSerdeClassName())) {
+                    throw new IllegalArgumentException(
+                            String.format("Only one of schemaType or serdeClassName should be set in inputSpec"));
+                }
+                if (!isEmpty(conf.getSerdeClassName())) {
+                    ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0], clsLoader, true);
+                }
+                if (!isEmpty(conf.getSchemaType())) {
+                    ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0], clsLoader, true);
+                }
+            });
+        }
+
+        if (Void.class.equals(typeArgs[1])) {
+            return;
+        }
+
+        // One and only one of outputSchemaType and outputSerdeClassName should be set
+        if (!isEmpty(functionConfig.getOutputSerdeClassName()) && !isEmpty(functionConfig.getOutputSchemaType())) {
+            throw new IllegalArgumentException(
+                    String.format("Only one of outputSchemaType or outputSerdeClassName should be set"));
+        }
+
+        if (!isEmpty(functionConfig.getOutputSchemaType())) {
+            ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], clsLoader, false);
+        }
+
+        if (!isEmpty(functionConfig.getOutputSerdeClassName())) {
+            ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], clsLoader, false);
+        }
+
+    }
+
+    private static void doPythonChecks(FunctionConfig functionConfig) {
+        if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+            throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
+        }
+
+        if (functionConfig.getWindowConfig() != null) {
+            throw new IllegalArgumentException("There is currently no support windowing in python");
+        }
+
+        if (functionConfig.getMaxMessageRetries() >= 0) {
+            throw new IllegalArgumentException("Message retries not yet supported in python");
+        }
+    }
+
+    private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
+        if (inputTopics.contains(outputTopic)) {
+            throw new IllegalArgumentException(
+                    String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
+                            outputTopic));
+        }
+    }
+
+    private static void doCommonChecks(FunctionConfig functionConfig) {
+        if (isEmpty(functionConfig.getTenant())) {
+            throw new IllegalArgumentException("Function tenant cannot be null");
+        }
+        if (isEmpty(functionConfig.getNamespace())) {
+            throw new IllegalArgumentException("Function namespace cannot be null");
+        }
+        if (isEmpty(functionConfig.getName())) {
+            throw new IllegalArgumentException("Function name cannot be null");
+        }
+        if (isEmpty(functionConfig.getClassName())) {
+            throw new IllegalArgumentException("Function classname cannot be null");
+        }
+
+        Collection<String> allInputTopics = collectAllInputTopics(functionConfig);
+        if (allInputTopics.isEmpty()) {
+            throw new IllegalArgumentException("No input topic(s) specified for the function");
+        }
+        for (String topic : allInputTopics) {
+            if (!TopicName.isValid(topic)) {
+                throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
+            }
+        }
+
+        if (!isEmpty(functionConfig.getOutput())) {
+            if (!TopicName.isValid(functionConfig.getOutput())) {
+                throw new IllegalArgumentException(String.format("Output topic %s is invalid", functionConfig.getOutput()));
+            }
+        }
+
+        if (!isEmpty(functionConfig.getLogTopic())) {
+            if (!TopicName.isValid(functionConfig.getLogTopic())) {
+                throw new IllegalArgumentException(String.format("LogTopic topic %s is invalid", functionConfig.getLogTopic()));
+            }
+        }
+
+        if (!isEmpty(functionConfig.getDeadLetterTopic())) {
+            if (!TopicName.isValid(functionConfig.getDeadLetterTopic())) {
+                throw new IllegalArgumentException(String.format("DeadLetter topic %s is invalid", functionConfig.getDeadLetterTopic()));
+            }
+        }
+
+        if (functionConfig.getParallelism() <= 0) {
+            throw new IllegalArgumentException("Function parallelism should positive number");
+        }
+        // Ensure that topics aren't being used as both input and output
+        verifyNoTopicClash(allInputTopics, functionConfig.getOutput());
+
+        WindowConfig windowConfig = functionConfig.getWindowConfig();
+        if (windowConfig != null) {
+            // set auto ack to false since windowing framework is responsible
+            // for acking and not the function framework
+            if (functionConfig.isAutoAck() == true) {
+                throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+            }
+            WindowConfigUtils.validate(windowConfig);
+        }
+
+        if (functionConfig.getResources() != null) {
+            ResourceConfigUtils.validate(functionConfig.getResources());
+        }
+
+        if (functionConfig.getTimeoutMs() != null && functionConfig.getTimeoutMs() <= 0) {
+            throw new IllegalArgumentException("Function timeout must be a positive number");
+        }
+
+        if (functionConfig.getTimeoutMs() != null
+                && functionConfig.getProcessingGuarantees() != null
+                && functionConfig.getProcessingGuarantees() != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
+            throw new IllegalArgumentException("Message timeout can only be specified with processing guarantee is "
+                    + FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
+        }
+
+        if (functionConfig.getMaxMessageRetries() >= 0
+                && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+            throw new IllegalArgumentException("MaxMessageRetries and Effectively once don't gel well");
+        }
+        if (functionConfig.getMaxMessageRetries() < 0 && !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
+            throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
+        }
+
+        if (!isEmpty(functionConfig.getJar()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getJar())
+                && functionConfig.getJar().startsWith(BUILTIN)) {
+            if (!new File(functionConfig.getJar()).exists()) {
+                throw new IllegalArgumentException("The supplied jar file does not exist");
+            }
+        }
+        if (!isEmpty(functionConfig.getPy()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
+                && functionConfig.getPy().startsWith(BUILTIN)) {
+            if (!new File(functionConfig.getPy()).exists()) {
+                throw new IllegalArgumentException("The supplied python file does not exist");
+            }
+        }
+    }
+
+    private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {
+        List<String> retval = new LinkedList<>();
+        if (functionConfig.getInputs() != null) {
+            retval.addAll(functionConfig.getInputs());
+        }
+        if (functionConfig.getTopicsPattern() != null) {
+            retval.add(functionConfig.getTopicsPattern());
+        }
+        if (functionConfig.getCustomSerdeInputs() != null) {
+            retval.addAll(functionConfig.getCustomSerdeInputs().keySet());
+        }
+        if (functionConfig.getCustomSchemaInputs() != null) {
+            retval.addAll(functionConfig.getCustomSchemaInputs().keySet());
+        }
+        if (functionConfig.getInputSpecs() != null) {
+            retval.addAll(functionConfig.getInputSpecs().keySet());
+        }
+        return retval;
+    }
+
+    public static ClassLoader validate(FunctionConfig functionConfig, String functionPkgUrl, File uploadedInputStreamAsFile) {
+        doCommonChecks(functionConfig);
+        if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+            ClassLoader classLoader = null;
+            if (org.apache.commons.lang3.StringUtils.isNotBlank(functionPkgUrl)) {
+                try {
+                    classLoader = Utils.extractClassLoader(functionPkgUrl);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Corrupted Jar File", e);
+                }
+            } else if (uploadedInputStreamAsFile != null) {
+                try {
+                    classLoader = loadJar(uploadedInputStreamAsFile);
+                } catch (MalformedURLException e) {
+                    throw new IllegalArgumentException("Corrupted Jar File", e);
+                }
+            } else if (!isEmpty(functionConfig.getJar())) {
+                File jarFile = new File(functionConfig.getJar());
+                if (!jarFile.exists()) {
+                    throw new IllegalArgumentException("Jar file does not exist");
+                }
+                try {
+                    classLoader = loadJar(jarFile);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Corrupted Jar File", e);
+                }
+            } else {
+                throw new IllegalArgumentException("Function Package is not provided");
+            }
+            doJavaChecks(functionConfig, classLoader);
+            return classLoader;
+        } else {
+            doPythonChecks(functionConfig);
+            return null;
+        }
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 611688967a..67dde1cb0f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -22,8 +22,6 @@
 import java.lang.reflect.Array;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.HashMap;
 import java.util.Map;
@@ -167,23 +165,12 @@ public static Object createInstance(String userClassName,
 
     public static Object createInstance(String userClassName, java.io.File jar) {
         try {
-            return createInstance(userClassName, loadJar(jar));
+            return createInstance(userClassName, Utils.loadJar(jar));
         } catch (Exception ex) {
             return null;
         }
     }
 
-    /**
-     * Load a jar
-     * @param jar file of jar
-     * @return classloader
-     * @throws MalformedURLException
-     */
-    public static ClassLoader loadJar(java.io.File jar) throws MalformedURLException {
-        java.net.URL url = jar.toURI().toURL();
-        return new URLClassLoader(new URL[]{url});
-    }
-
     /**
      * Check if a class is in a jar
      * @param jar location of the jar
@@ -192,7 +179,7 @@ public static ClassLoader loadJar(java.io.File jar) throws MalformedURLException
      */
     public static boolean classExistsInJar(java.io.File jar, String fqcn) {
         try {
-            java.net.URLClassLoader loader = (URLClassLoader) loadJar(jar);
+            java.net.URLClassLoader loader = (URLClassLoader) Utils.loadJar(jar);
             Class.forName(fqcn, false, loader);
             loader.close();
             return true;
@@ -224,7 +211,7 @@ public static boolean classExists(String fqcn) {
     public static boolean classInJarImplementsIface(java.io.File jar, String fqcn, Class xface) {
         boolean ret = false;
         try {
-            java.net.URLClassLoader loader = (URLClassLoader) loadJar(jar);
+            java.net.URLClassLoader loader = (URLClassLoader) Utils.loadJar(jar);
             if (xface.isAssignableFrom(Class.forName(fqcn, false, loader))){
                 ret = true;
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
similarity index 52%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
index ab8782b546..fc80b34d4c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
@@ -16,16 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.utils.validation;
 
-import java.util.Map;
+package org.apache.pulsar.functions.utils;
 
-public abstract class Validator {
-    public Validator(Map<String, Object> params) {
+public class ResourceConfigUtils {
+    public static void validate(Resources resources) {
+        Double cpu = resources.getCpu();
+        Long ram = resources.getRam();
+        Long disk = resources.getDisk();
+        com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0.0,
+                "The cpu allocation for the function must be positive");
+        com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0L,
+                "The ram allocation for the function must be positive");
+        com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0L,
+                "The disk allocation for the function must be positive");
     }
-
-    public Validator() {
-    }
-
-    public abstract void validateField(String name, Object o, ClassLoader classLoader);
-}
+}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 1132fa6b86..1964c1e909 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -28,60 +28,36 @@
 import lombok.Setter;
 import lombok.ToString;
 
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidSinkConfig;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
-import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
-
 @Getter
 @Setter
 @Data
 @EqualsAndHashCode
 @ToString
-@isValidSinkConfig
 public class SinkConfig {
 
-
-    @NotNull
     private String tenant;
-    @NotNull
     private String namespace;
-    @NotNull
     private String name;
     private String className;
     private String sourceSubscriptionName;
 
-    @ConfigValidationAnnotations.isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
     private Collection<String> inputs;
 
-    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
-            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
     private Map<String, String> topicToSerdeClassName;
 
-    @isValidTopicName
     private String topicsPattern;
 
-    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class })
     private Map<String, String> topicToSchemaType;
 
     private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
 
     private Map<String, Object> configs;
-    @isPositiveNumber
     private int parallelism = 1;
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     private boolean retainOrdering;
-    @isValidResources
     private Resources resources;
     private boolean autoAck;
-    @isPositiveNumber
     private Long timeoutMs;
 
-    @isFileExists
     private String archive;
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 545d34419d..4930b14cdc 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -22,6 +22,7 @@
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
@@ -31,14 +32,14 @@
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.nio.file.Path;
+import java.util.*;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.Utils.getSinkType;
 
 public class SinkConfigUtils {
 
@@ -59,7 +60,7 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas
                 sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class
             } else {
                 sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
-                typeArg = Utils.getSinkType(sinkClassName, classLoader).getName();
+                typeArg = getSinkType(sinkClassName, classLoader).getName();
             }
         }
 
@@ -232,4 +233,106 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
 
         return sinkConfig;
     }
+
+    public static NarClassLoader validate(SinkConfig sinkConfig, Path archivePath, String functionPkgUrl,
+                                          File uploadedInputStreamAsFile) {
+        if (isEmpty(sinkConfig.getTenant())) {
+            throw new IllegalArgumentException("Sink tenant cannot be null");
+        }
+        if (isEmpty(sinkConfig.getNamespace())) {
+            throw new IllegalArgumentException("Sink namespace cannot be null");
+        }
+        if (isEmpty(sinkConfig.getName())) {
+            throw new IllegalArgumentException("Sink name cannot be null");
+        }
+
+        // make we sure we have one source of input
+        Collection<String> allInputs = collectAllInputTopics(sinkConfig);
+        if (allInputs.isEmpty()) {
+            throw new IllegalArgumentException("Must specify at least one topic of input via topicToSerdeClassName, " +
+                    "topicsPattern, topicToSchemaType or inputSpecs");
+        }
+        for (String topic : allInputs) {
+            if (!TopicName.isValid(topic)) {
+                throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
+            }
+        }
+
+        if (sinkConfig.getParallelism() <= 0) {
+            throw new IllegalArgumentException("Sink parallelism should positive number");
+        }
+
+        if (sinkConfig.getResources() != null) {
+            ResourceConfigUtils.validate(sinkConfig.getResources());
+        }
+
+        if (sinkConfig.getTimeoutMs() != null && sinkConfig.getTimeoutMs() <= 0) {
+            throw new IllegalArgumentException("Sink timeout must be a positive number");
+        }
+
+        NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+        if (classLoader == null) {
+            // This happens at the cli for builtin. There is no need to check this since
+            // the actual check will be done at serverside
+            return null;
+        }
+
+        String sinkClassName;
+        try {
+            sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+        } catch (IOException e1) {
+            throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
+        }
+        Class<?> typeArg = getSinkType(sinkClassName, classLoader);
+
+        if (sinkConfig.getTopicToSerdeClassName() != null) {
+            sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
+                ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
+            });
+        }
+
+        if (sinkConfig.getTopicToSchemaType() != null) {
+            sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
+                ValidatorUtils.validateSchema(schemaType, typeArg, classLoader, true);
+            });
+        }
+
+        // topicsPattern does not need checks
+
+        if (sinkConfig.getInputSpecs() != null) {
+            sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
+                // Only one is set
+                if (!isEmpty(consumerSpec.getSerdeClassName()) && !isEmpty(consumerSpec.getSchemaType())) {
+                    throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
+                }
+                if (!isEmpty(consumerSpec.getSerdeClassName())) {
+                    ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg, classLoader, true);
+                }
+                if (!isEmpty(consumerSpec.getSchemaType())) {
+                    ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, classLoader, true);
+                }
+            });
+        }
+        return classLoader;
+    }
+
+    private static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
+        List<String> retval = new LinkedList<>();
+        if (sinkConfig.getInputs() != null) {
+            retval.addAll(sinkConfig.getInputs());
+        }
+        if (sinkConfig.getTopicToSerdeClassName() != null) {
+            retval.addAll(sinkConfig.getTopicToSerdeClassName().keySet());
+        }
+        if (sinkConfig.getTopicsPattern() != null) {
+            retval.add(sinkConfig.getTopicsPattern());
+        }
+        if (sinkConfig.getTopicToSchemaType() != null) {
+            retval.addAll(sinkConfig.getTopicToSchemaType().keySet());
+        }
+        if (sinkConfig.getInputSpecs() != null) {
+            retval.addAll(sinkConfig.getInputSpecs().keySet());
+        }
+        return retval;
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
index dfad9155e0..901b0c1203 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
@@ -23,13 +23,6 @@
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidSourceConfig;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
 
 import java.util.Map;
 
@@ -38,32 +31,22 @@
 @Data
 @EqualsAndHashCode
 @ToString
-@isValidSourceConfig
 public class SourceConfig {
-    @NotNull
     private String tenant;
-    @NotNull
     private String namespace;
-    @NotNull
     private String name;
     private String className;
 
-    @NotNull
-    @isValidTopicName
     private String topicName;
 
-    @ConfigValidationAnnotations.isValidSerde
     private String serdeClassName;
 
     private String schemaType;
 
     private Map<String, Object> configs;
-    @isPositiveNumber
     private int parallelism = 1;
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
-    @isValidResources
     private Resources resources;
 
-    @isFileExists
     private String archive;
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 3424062839..80287b114f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -22,16 +22,20 @@
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.nio.file.Path;
 import java.util.Map;
 
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 
@@ -167,4 +171,57 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
         }
         return sourceConfig;
     }
+
+    public static NarClassLoader validate(SourceConfig sourceConfig, Path archivePath, String functionPkgUrl, File uploadedInputStreamAsFile) {
+        if (isEmpty(sourceConfig.getTenant())) {
+            throw new IllegalArgumentException("Source tenant cannot be null");
+        }
+        if (isEmpty(sourceConfig.getNamespace())) {
+            throw new IllegalArgumentException("Source namespace cannot be null");
+        }
+        if (isEmpty(sourceConfig.getName())) {
+            throw new IllegalArgumentException("Source name cannot be null");
+        }
+        if (isEmpty(sourceConfig.getTopicName())) {
+            throw new IllegalArgumentException("Topic name cannot be null");
+        }
+        if (!TopicName.isValid(sourceConfig.getTopicName())) {
+            throw new IllegalArgumentException("Topic name is invalid");
+        }
+        if (sourceConfig.getParallelism() <= 0) {
+            throw new IllegalArgumentException("Source parallelism should positive number");
+        }
+        if (sourceConfig.getResources() != null) {
+            ResourceConfigUtils.validate(sourceConfig.getResources());
+        }
+
+        NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+        if (classLoader == null) {
+            // This happens at the cli for builtin. There is no need to check this since
+            // the actual check will be done at serverside
+            return null;
+        }
+
+        String sourceClassName;
+        try {
+            sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
+        } catch (IOException e1) {
+            throw new IllegalArgumentException("Failed to extract source class from archive", e1);
+        }
+
+        Class<?> typeArg = getSourceType(sourceClassName, classLoader);
+
+        // Only one of serdeClassName or schemaType should be set
+        if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
+            throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
+        }
+
+        if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
+            ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), typeArg, classLoader, false);
+        }
+        if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
+            ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, classLoader, false);
+        }
+        return classLoader;
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index adeaee162e..91f4835a55 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -21,14 +21,22 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
-import java.net.ServerSocket;
+import java.net.*;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Path;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
 import org.apache.pulsar.io.core.Sink;
@@ -217,4 +225,125 @@ public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) {
         return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(Utils.HTTP)
                 || functionPkgUrl.startsWith(Utils.FILE));
     }
+
+    /**
+     * Load a jar
+     * @param jar file of jar
+     * @return classloader
+     * @throws MalformedURLException
+     */
+    public static ClassLoader loadJar(File jar) throws MalformedURLException {
+        java.net.URL url = jar.toURI().toURL();
+        return new URLClassLoader(new URL[]{url});
+    }
+
+    public static ClassLoader extractClassLoader(String destPkgUrl) throws IOException, URISyntaxException {
+        if (destPkgUrl.startsWith(FILE)) {
+            URL url = new URL(destPkgUrl);
+            File file = new File(url.toURI());
+            if (!file.exists()) {
+                throw new IOException(destPkgUrl + " does not exists locally");
+            }
+            try {
+                return loadJar(file);
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException(
+                        "Corrupt User PackageFile " + file + " with error " + e.getMessage());
+            }
+        } else if (destPkgUrl.startsWith("http")) {
+            URL website = new URL(destPkgUrl);
+            File tempFile = File.createTempFile("function", ".tmp");
+            ReadableByteChannel rbc = Channels.newChannel(website.openStream());
+            try (FileOutputStream fos = new FileOutputStream(tempFile)) {
+                fos.getChannel().transferFrom(rbc, 0, 10);
+            }
+            if (tempFile.exists()) {
+                tempFile.delete();
+            }
+            return loadJar(tempFile);
+        } else {
+            throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
+        }
+    }
+
+    public static void implementsClass(String className, Class<?> klass, ClassLoader classLoader) {
+        Class<?> objectClass;
+        try {
+            objectClass = loadClass(className, classLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Cannot find/load class " + className);
+        }
+
+        if (!klass.isAssignableFrom(objectClass)) {
+            throw new IllegalArgumentException(
+                    String.format("%s does not implement %s", className, klass.getName()));
+        }
+    }
+
+    public static Class<?> loadClass(String className, ClassLoader classLoader) throws ClassNotFoundException {
+        Class<?> objectClass;
+        try {
+            objectClass = Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            if (classLoader != null) {
+                objectClass = classLoader.loadClass(className);
+            } else {
+                throw e;
+            }
+        }
+        return objectClass;
+    }
+
+    public static NarClassLoader extractNarClassLoader(Path archivePath, String pkgUrl, File uploadedInputStreamFileName) {
+        if (archivePath != null) {
+            try {
+                return NarClassLoader.getFromArchive(archivePath.toFile(),
+                            Collections.emptySet());
+            } catch (IOException e) {
+                throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
+            }
+        }
+        if (!StringUtils.isEmpty(pkgUrl)) {
+            if (pkgUrl.startsWith(FILE)) {
+                try {
+                    URL url = new URL(pkgUrl);
+                    File file = new File(url.toURI());
+                    if (!file.exists()) {
+                        throw new IOException(pkgUrl + " does not exists locally");
+                    }
+                    return NarClassLoader.getFromArchive(file, Collections.emptySet());
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(
+                            "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
+                }
+            } else if (pkgUrl.startsWith("http")) {
+                try {
+                    URL website = new URL(pkgUrl);
+                    File tempFile = File.createTempFile("function", ".tmp");
+                    ReadableByteChannel rbc = Channels.newChannel(website.openStream());
+                    try (FileOutputStream fos = new FileOutputStream(tempFile)) {
+                        fos.getChannel().transferFrom(rbc, 0, 10);
+                    }
+                    if (tempFile.exists()) {
+                        tempFile.delete();
+                    }
+                    return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(
+                            "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
+                }
+            } else {
+                throw new IllegalArgumentException("Unsupported url protocol "+ pkgUrl +", supported url protocols: [file/http/https]");
+            }
+        }
+        if (uploadedInputStreamFileName != null) {
+            try {
+                return NarClassLoader.getFromArchive(uploadedInputStreamFileName,
+                        Collections.emptySet());
+            } catch (IOException e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+        }
+        return null;
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
new file mode 100644
index 0000000000..483ea08fa1
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import net.jodah.typetools.TypeResolver;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+
+import java.io.File;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.*;
+
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.pulsar.functions.utils.Utils.loadJar;
+
+public class ValidatorUtils {
+    private static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";
+
+    public static void validateSchema(String schemaType, Class<?> typeArg, ClassLoader clsLoader,
+                                      boolean input) {
+        if (isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) {
+            // If it's empty, we use the default schema and no need to validate
+            // If it's built-in, no need to validate
+        } else {
+            Utils.implementsClass(schemaType, Schema.class, clsLoader);
+            validateSchemaType(schemaType, typeArg, clsLoader, input);
+        }
+    }
+
+    private static SchemaType getBuiltinSchemaType(String schemaTypeOrClassName) {
+        try {
+            return SchemaType.valueOf(schemaTypeOrClassName.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            // schemaType is not referring to builtin type
+            return null;
+        }
+    }
+
+    public static void validateSerde(String inputSerializer, Class<?> typeArg, ClassLoader clsLoader,
+                                     boolean deser) {
+        if (isEmpty(inputSerializer)) return;
+        if (inputSerializer.equals(DEFAULT_SERDE)) return;
+        try {
+            Class<?> serdeClass = Utils.loadClass(inputSerializer, clsLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException(
+                    String.format("The input serialization/deserialization class %s does not exist",
+                            inputSerializer));
+        }
+        Utils.implementsClass(inputSerializer, SerDe.class, clsLoader);
+
+        SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
+        if (serDe == null) {
+            throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
+                    inputSerializer));
+        }
+        Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+
+        // type inheritance information seems to be lost in generic type
+        // load the actual type class for verification
+        Class<?> fnInputClass;
+        Class<?> serdeInputClass;
+        try {
+            fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
+            serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Failed to load type class", e);
+        }
+
+        if (deser) {
+            if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+                throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+            }
+        } else {
+            if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
+                throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+            }
+        }
+    }
+
+    private static void validateSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader,
+                                           boolean input) {
+        Schema<?> schema = (Schema<?>) Reflections.createInstance(schemaClassName, clsLoader);
+        if (schema == null) {
+            throw new IllegalArgumentException(String.format("The Schema class %s does not exist",
+                    schemaClassName));
+        }
+        Class<?>[] schemaTypes = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
+
+        // type inheritance information seems to be lost in generic type
+        // load the actual type class for verification
+        Class<?> fnInputClass;
+        Class<?> schemaInputClass;
+        try {
+            fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
+            schemaInputClass = Class.forName(schemaTypes[0].getName(), true, clsLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Failed to load type class", e);
+        }
+
+        if (input) {
+            if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+                throw new IllegalArgumentException(
+                        "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+            }
+        } else {
+            if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
+                throw new IllegalArgumentException(
+                        "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+            }
+        }
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/WindowConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/WindowConfigUtils.java
new file mode 100644
index 0000000000..cf59a90439
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/WindowConfigUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+public class WindowConfigUtils {
+    public static void validate(WindowConfig windowConfig) {
+        if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
+            throw new IllegalArgumentException("Window length is not specified");
+        }
+
+        if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
+            throw new IllegalArgumentException(
+                    "Window length for time and count are set! Please set one or the other.");
+        }
+
+        if (windowConfig.getWindowLengthCount() != null) {
+            if (windowConfig.getWindowLengthCount() <= 0) {
+                throw new IllegalArgumentException(
+                        "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
+            }
+        }
+
+        if (windowConfig.getWindowLengthDurationMs() != null) {
+            if (windowConfig.getWindowLengthDurationMs() <= 0) {
+                throw new IllegalArgumentException(
+                        "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
+            }
+        }
+
+        if (windowConfig.getSlidingIntervalCount() != null) {
+            if (windowConfig.getSlidingIntervalCount() <= 0) {
+                throw new IllegalArgumentException(
+                        "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
+            }
+        }
+
+        if (windowConfig.getSlidingIntervalDurationMs() != null) {
+            if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
+                throw new IllegalArgumentException(
+                        "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
+            }
+        }
+
+        if (windowConfig.getTimestampExtractorClassName() != null) {
+            if (windowConfig.getMaxLagMs() != null) {
+                if (windowConfig.getMaxLagMs() < 0) {
+                    throw new IllegalArgumentException(
+                            "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
+                }
+            }
+            if (windowConfig.getWatermarkEmitIntervalMs() != null) {
+                if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
deleted file mode 100644
index 0b600f4c78..0000000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.utils.validation;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.ACTUAL_RUNTIME;
-import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.TARGET_RUNTIME;
-import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS;
-
-@Slf4j
-public class ConfigValidation {
-
-    public enum Runtime {
-        ALL,
-        JAVA,
-        PYTHON
-    }
-
-    public static void validateConfig(Object config, String runtimeType, ClassLoader classLoader) {
-        for (Field field : config.getClass().getDeclaredFields()) {
-            Object value;
-            field.setAccessible(true);
-            try {
-                value = field.get(config);
-            } catch (IllegalAccessException e) {
-               throw new RuntimeException(e);
-            }
-            validateField(field, value, Runtime.valueOf(runtimeType), classLoader);
-        }
-        validateClass(config, Runtime.valueOf(runtimeType), classLoader);
-    }
-
-    private static void validateClass(Object config, Runtime runtime, ClassLoader classLoader) {
-
-        List<Annotation> annotationList = new LinkedList<>();
-        Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
-        for (Class clazz : classes) {
-            try {
-                Annotation[] anots = config.getClass().getAnnotationsByType(clazz);
-                annotationList.addAll(Arrays.asList(anots));
-            } catch (ClassCastException e) {
-
-            }
-        }
-        processAnnotations(annotationList, config.getClass().getName(), config, runtime, classLoader);
-    }
-
-    private static void validateField(Field field, Object value, Runtime runtime, ClassLoader classLoader) {
-        List<Annotation> annotationList = new LinkedList<>();
-        Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
-        for (Class clazz : classes) {
-            try {
-                Annotation[] anots = field.getAnnotationsByType(clazz);
-                annotationList.addAll(Arrays.asList(anots));
-            } catch (ClassCastException e) {
-
-            }
-        }
-        processAnnotations(annotationList, field.getName(), value, runtime, classLoader);
-    }
-
-    private static void processAnnotations( List<Annotation> annotations, String fieldName, Object value,
-                                           Runtime runtime, ClassLoader classLoader) {
-        try {
-            for (Annotation annotation : annotations) {
-
-                String type = annotation.annotationType().getName();
-                Class<?> validatorClass = null;
-                Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
-                //check if annotation is one of our
-                for (Class<?> clazz : classes) {
-                    if (clazz.getName().equals(type)) {
-                        validatorClass = clazz;
-                        break;
-                    }
-                }
-                if (validatorClass != null) {
-                    Object v = validatorClass.cast(annotation);
-                    if (hasMethod(validatorClass, VALIDATOR_CLASS)) {
-
-                        @SuppressWarnings("unchecked")
-                        Class<Validator> clazz = (Class<Validator>) validatorClass
-                                .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
-                        Validator o = null;
-                        Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
-
-                        if (params.containsKey(TARGET_RUNTIME)
-                                && params.get(TARGET_RUNTIME) != Runtime.ALL
-                                && params.get(TARGET_RUNTIME) != runtime) {
-                            continue;
-                        }
-                        params.put(ACTUAL_RUNTIME, runtime);
-                        //two constructor signatures used to initialize validators.
-                        //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
-
-                        //If validator has a constructor that takes a Map as an argument call that constructor
-                        if (hasConstructor(clazz, Map.class)) {
-                            o = clazz.getConstructor(Map.class).newInstance(params);
-                        } else { //If not call default constructor
-                            o = clazz.newInstance();
-                        }
-                        o.validateField(fieldName, value, classLoader);
-                    }
-                }
-            }
-        } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static boolean hasMethod(Class<?> clazz, String method) {
-        try {
-            clazz.getMethod(method);
-            return true;
-        } catch (NoSuchMethodException e) {
-           return false;
-        }
-    }
-
-    private static Map<String, Object> getParamsFromAnnotation(Class<?> validatorClass, Object v)
-            throws InvocationTargetException, IllegalAccessException {
-        Map<String, Object> params = new HashMap<String, Object>();
-        for (Method method : validatorClass.getDeclaredMethods()) {
-
-            Object value = null;
-            try {
-                value = (Object) method.invoke(v);
-            } catch (IllegalArgumentException ex) {
-                value = null;
-            }
-            if (value != null) {
-                params.put(method.getName(), value);
-            }
-        }
-        return params;
-    }
-
-    public static boolean hasConstructor(Class<?> clazz, Class<?> paramClass) {
-        Class<?>[] classes = { paramClass };
-        try {
-            clazz.getConstructor(classes);
-        } catch (NoSuchMethodException e) {
-            return false;
-        }
-        return true;
-    }
-}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
deleted file mode 100644
index d562404a90..0000000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.utils.validation;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Repeatable;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-public class ConfigValidationAnnotations {
-
-    /**
-     * Validates on object is not null
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface NotNull {
-        Class<?> validatorClass() default ValidatorImpls.NotNullValidator.class;
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-    /**
-     * Checks if a number is positive and whether zero inclusive Validator with fields: validatorClass, includeZero
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isPositiveNumber {
-        Class<?> validatorClass() default ValidatorImpls.PositiveNumberValidator.class;
-
-        boolean includeZero() default false;
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-
-    /**
-     * Checks if resources specified are valid
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isValidResources {
-
-        Class<?> validatorClass() default ValidatorImpls.ResourcesValidator.class;
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-    /**
-     * validates each entry in a list is of a certain type
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isListEntryType {
-        Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
-
-        Class<?> type();
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isStringList {
-        Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
-
-        Class<?> type() default String.class;
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-    /**
-     * Validates each entry in a list with a list of validators Validators with fields: validatorClass and entryValidatorClass
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isListEntryCustom {
-        Class<?> validatorClass() default ValidatorImpls.ListEntryCustomValidator.class;
-
-        Class<?>[] entryValidatorClasses();
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-
-    /**
-     * Validates the type of each key and value in a map Validator with fields: validatorClass, keyValidatorClass, valueValidatorClass
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isMapEntryType {
-        Class<?> validatorClass() default ValidatorImpls.MapEntryTypeValidator.class;
-
-        Class<?> keyType();
-
-        Class<?> valueType();
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-    /**
-     * Checks if class name is assignable to the provided class/interfaces
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isImplementationOfClass {
-        Class<?> validatorClass() default ValidatorImpls.ImplementsClassValidator.class;
-
-        Class<?> implementsClass();
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
-    }
-
-    /**
-     * Checks if class name is assignable to ONE of the provided list class/interfaces
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isImplementationOfClasses {
-        Class<?> validatorClass() default ValidatorImpls.ImplementsClassesValidator.class;
-
-        Class<?>[] implementsClasses();
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
-    }
-
-    /**
-     * Validates a each key and value in a Map with a list of validators Validator with fields: validatorClass, keyValidatorClasses,
-     * valueValidatorClasses
-     */
-    @Repeatable(isMapEntryCustoms.class)
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isMapEntryCustom {
-        Class<?> validatorClass() default ValidatorImpls.MapEntryCustomValidator.class;
-
-        Class<?>[] keyValidatorClasses() default {};
-
-        Class<?>[] valueValidatorClasses() default {};
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isMapEntryCustoms {
-        isMapEntryCustom[] value();
-    }
-
-    /**
-     * checks if the topic name is valid
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isValidTopicName {
-        Class<?> validatorClass() default ValidatorImpls.TopicNameValidator.class;
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-    /**
-     * checks if the topic name is valid
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isValidSerde {
-        Class<?> validatorClass() default ValidatorImpls.SerdeValidator.class;
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
-    }
-
-    /**
-     * checks if window configs is valid
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isValidWindowConfig {
-        Class<?> validatorClass() default ValidatorImpls.WindowConfigValidator.class;
-
-        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
-    }
-
-    /**
-     * check if file exists
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
-    public @interface isFileExists {
-        Class<?> validatorClass() default ValidatorImpls.FileValidator.class;
-    }
-
-    /**
-     * checks function config as a whole to make sure all fields are valid
-     */
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target({ElementType.TYPE})
-    public @interface isValidFunctionConfig {
-        Class<?> validatorClass() default ValidatorImpls.FunctionConfigValidator.class;
-    }
-
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target({ElementType.TYPE})
-    public @interface isValidSourceConfig {
-        Class<?> validatorClass() default ValidatorImpls.SourceConfigValidator.class;
-    }
-
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target({ElementType.TYPE})
-    public @interface isValidSinkConfig {
-        Class<?> validatorClass() default ValidatorImpls.SinkConfigValidator.class;
-    }
-    /**
-     * Field names for annotations
-     */
-    public static class ValidatorParams {
-        static final String VALIDATOR_CLASS = "validatorClass";
-        static final String TYPE = "type";
-        static final String BASE_TYPE = "baseType";
-        static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
-        static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
-        static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
-        static final String KEY_TYPE = "keyType";
-        static final String VALUE_TYPE = "valueType";
-        static final String INCLUDE_ZERO = "includeZero";
-        static final String ACCEPTED_VALUES = "acceptedValues";
-        static final String IMPLEMENTS_CLASS = "implementsClass";
-        static final String IMPLEMENTS_CLASSES = "implementsClasses";
-        static final String ACTUAL_RUNTIME = "actualRuntime";
-        static final String TARGET_RUNTIME = "targetRuntime";
-    }
-}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
deleted file mode 100644
index f81cf068d6..0000000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.utils.validation;
-
-import java.util.Map;
-
-public class ConfigValidationUtils {
-    /**
-     * Returns a new NestableFieldValidator for a given class.
-     *
-     * @param cls     the Class the field should be a type of
-     * @param notNull whether or not a value of null is valid
-     * @return a NestableFieldValidator for that class
-     */
-    public static NestableFieldValidator fv(final Class cls, final boolean notNull) {
-        return new NestableFieldValidator() {
-            @Override
-            public void validateField(String pd, String name, Object field)
-                throws IllegalArgumentException {
-                if (field == null) {
-                    if (notNull) {
-                        throw new IllegalArgumentException("Field " + name + " must not be null");
-                    } else {
-                        return;
-                    }
-                }
-                if (!cls.isInstance(field)) {
-                    throw new IllegalArgumentException(
-                        pd + name + " must be a " + cls.getName() + ". (" + field + ")");
-                }
-            }
-        };
-    }
-
-    /**
-     * Returns a new NestableFieldValidator for a List of the given Class.
-     *
-     * @param cls     the Class of elements composing the list
-     * @param notNull whether or not a value of null is valid
-     * @return a NestableFieldValidator for a list of the given class
-     */
-    public static NestableFieldValidator listFv(Class cls, boolean notNull) {
-        return listFv(fv(cls, notNull), notNull);
-    }
-
-    /**
-     * Returns a new NestableFieldValidator for a List where each item is validated by validator.
-     *
-     * @param validator used to validate each item in the list
-     * @param notNull   whether or not a value of null is valid
-     * @return a NestableFieldValidator for a list with each item validated by a different validator.
-     */
-    public static NestableFieldValidator listFv(final NestableFieldValidator validator,
-                                                final boolean notNull) {
-        return new NestableFieldValidator() {
-            @Override
-            public void validateField(String pd, String name, Object field)
-                throws IllegalArgumentException {
-
-                if (field == null) {
-                    if (notNull) {
-                        throw new IllegalArgumentException("Field " + name + " must not be null");
-                    } else {
-                        return;
-                    }
-                }
-                if (field instanceof Iterable) {
-                    for (Object e : (Iterable) field) {
-                        validator.validateField(pd + "Each element of the list ", name, e);
-                    }
-                    return;
-                }
-                throw new IllegalArgumentException(
-                    "Field " + name + " must be an Iterable but was " +
-                    ((field == null) ? "null" : ("a " + field.getClass())));
-            }
-        };
-    }
-
-    /**
-     * Returns a new NestableFieldValidator for a Map of key to val.
-     *
-     * @param key     the Class of keys in the map
-     * @param val     the Class of values in the map
-     * @param notNull whether or not a value of null is valid
-     * @return a NestableFieldValidator for a Map of key to val
-     */
-    public static NestableFieldValidator mapFv(Class key, Class val,
-                                               boolean notNull) {
-        return mapFv(fv(key, false), fv(val, false), notNull);
-    }
-
-    /**
-     * Returns a new NestableFieldValidator for a Map.
-     *
-     * @param key     a validator for the keys in the map
-     * @param val     a validator for the values in the map
-     * @param notNull whether or not a value of null is valid
-     * @return a NestableFieldValidator for a Map
-     */
-    public static NestableFieldValidator mapFv(final NestableFieldValidator key,
-                                               final NestableFieldValidator val, final boolean notNull) {
-        return new NestableFieldValidator() {
-            @SuppressWarnings("unchecked")
-            @Override
-            public void validateField(String pd, String name, Object field)
-                throws IllegalArgumentException {
-                if (field == null) {
-                    if (notNull) {
-                        throw new IllegalArgumentException("Field " + name + " must not be null");
-                    } else {
-                        return;
-                    }
-                }
-                if (field instanceof Map) {
-                    for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) field).entrySet()) {
-                        key.validateField("Each key of the map ", name, entry.getKey());
-                        val.validateField("Each value in the map ", name, entry.getValue());
-                    }
-                    return;
-                }
-                throw new IllegalArgumentException(
-                    "Field " + name + " must be a Map");
-            }
-        };
-    }
-
-    /**
-     * Declares methods for validating configuration values.
-     */
-    public static interface FieldValidator {
-        /**
-         * Validates the given field.
-         *
-         * @param name  the name of the field.
-         * @param field The field to be validated.
-         * @throws IllegalArgumentException if the field fails validation.
-         */
-        public void validateField(String name, Object field) throws IllegalArgumentException;
-    }
-
-    /**
-     * Declares a method for validating configuration values that is nestable.
-     */
-    public static abstract class NestableFieldValidator implements FieldValidator {
-        @Override
-        public void validateField(String name, Object field) throws IllegalArgumentException {
-            validateField(null, name, field);
-        }
-
-        /**
-         * Validates the given field.
-         *
-         * @param pd    describes the parent wrapping this validator.
-         * @param name  the name of the field.
-         * @param field The field to be validated.
-         * @throws IllegalArgumentException if the field fails validation.
-         */
-        public abstract void validateField(String pd, String name, Object field) throws IllegalArgumentException;
-    }
-}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
deleted file mode 100644
index 08690fe855..0000000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ /dev/null
@@ -1,975 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.utils.validation;
-
-import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.utils.Utils.getSinkType;
-import static org.apache.pulsar.functions.utils.Utils.getSourceType;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.HashSet;
-import java.util.Map;
-
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Resources;
-import org.apache.pulsar.functions.utils.SinkConfig;
-import org.apache.pulsar.functions.utils.SourceConfig;
-import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.functions.utils.WindowConfig;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-
-import net.jodah.typetools.TypeResolver;
-
-@Slf4j
-public class ValidatorImpls {
-
-    private static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";
-
-    /**
-     * Validates a positive number.
-     */
-    public static class PositiveNumberValidator extends Validator {
-
-        private boolean includeZero;
-
-        public PositiveNumberValidator() {
-            this.includeZero = false;
-        }
-
-        public PositiveNumberValidator(Map<String, Object> params) {
-            this.includeZero = (boolean) params.get(ConfigValidationAnnotations.ValidatorParams.INCLUDE_ZERO);
-        }
-
-        public static void validateField(String name, boolean includeZero, Object o) {
-            if (o == null) {
-                return;
-            }
-            if (o instanceof Number) {
-                if (includeZero) {
-                    if (((Number) o).doubleValue() >= 0.0) {
-                        return;
-                    }
-                } else {
-                    if (((Number) o).doubleValue() > 0.0) {
-                        return;
-                    }
-                }
-            }
-            throw new IllegalArgumentException(String.format("Field '%s' must be a Positive Number", name));
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            validateField(name, this.includeZero, o);
-        }
-    }
-
-    /**
-     * Validates if an object is not null.
-     */
-
-    public static class NotNullValidator extends Validator {
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            if (o == null) {
-                throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name));
-            }
-        }
-    }
-
-    public static class ResourcesValidator extends Validator {
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            if (o == null) {
-                return;
-            }
-
-            if (o instanceof Resources) {
-                Resources resources = (Resources) o;
-                Double cpu = resources.getCpu();
-                Long ram = resources.getRam();
-                Long disk = resources.getDisk();
-                com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0.0,
-                        "The cpu allocation for the function must be positive");
-                com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0L,
-                        "The ram allocation for the function must be positive");
-                com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0L,
-                        "The disk allocation for the function must be positive");
-            } else {
-                throw new IllegalArgumentException(String.format("Field '%s' must be of Resource type!", name));
-            }
-        }
-    }
-
-    /**
-     * Validates each entry in a list.
-     */
-    public static class ListEntryTypeValidator extends Validator {
-
-        private Class<?> type;
-
-        public ListEntryTypeValidator(Map<String, Object> params) {
-            this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
-        }
-
-        public static void validateField(String name, Class<?> type, Object o) {
-            ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.listFv(type, false);
-            validator.validateField(name, o);
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            validateField(name, this.type, o);
-        }
-    }
-
-    /**
-     * validates each key and value in a map of a certain type.
-     */
-    public static class MapEntryTypeValidator extends Validator {
-
-        private Class<?> keyType;
-        private Class<?> valueType;
-
-        public MapEntryTypeValidator(Map<String, Object> params) {
-            this.keyType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
-            this.valueType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
-        }
-
-        public static void validateField(String name, Class<?> keyType, Class<?> valueType, Object o) {
-            ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(keyType, valueType, false);
-            validator.validateField(name, o);
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            validateField(name, this.keyType, this.valueType, o);
-        }
-    }
-
-    public static class ImplementsClassValidator extends Validator {
-
-        Class<?> classImplements;
-
-        public ImplementsClassValidator(Map<String, Object> params) {
-            this.classImplements = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
-        }
-
-        public ImplementsClassValidator(Class<?> classImplements) {
-            this.classImplements = classImplements;
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            if (o == null) {
-                return;
-            }
-            SimpleTypeValidator.validateField(name, String.class, o);
-            String className = (String) o;
-            if (StringUtils.isEmpty(className)) {
-                return;
-            }
-
-            Class<?> objectClass;
-            try {
-                objectClass = loadClass(className, classLoader);
-            } catch (ClassNotFoundException e) {
-                throw new IllegalArgumentException("Cannot find/load class " + className);
-            }
-
-            if (!this.classImplements.isAssignableFrom(objectClass)) {
-                throw new IllegalArgumentException(
-                        String.format("Field '%s' with value '%s' does not implement %s",
-                                name, o, this.classImplements.getName()));
-            }
-        }
-    }
-
-    /**
-     * validates class implements one of these classes
-     */
-    public static class ImplementsClassesValidator extends Validator {
-
-        Class<?>[] classesImplements;
-
-        public ImplementsClassesValidator(Map<String, Object> params) {
-            this.classesImplements = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASSES);
-        }
-
-        public ImplementsClassesValidator(Class<?>... classesImplements) {
-            this.classesImplements = classesImplements;
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            if (o == null) {
-                return;
-            }
-            SimpleTypeValidator.validateField(name, String.class, o);
-            String className = (String) o;
-            if (StringUtils.isEmpty(className)) {
-                return;
-            }
-            int count = 0;
-            for (Class<?> classImplements : classesImplements) {
-                Class<?> objectClass = null;
-                try {
-                    objectClass = loadClass(className, classLoader);
-                } catch (ClassNotFoundException e) {
-                    throw new IllegalArgumentException("Cannot find/load class " + className);
-                }
-
-                if (classImplements.isAssignableFrom(objectClass)) {
-                    count++;
-                }
-            }
-            if (count == 0) {
-                throw new IllegalArgumentException(
-                        String.format("Field '%s' with value '%s' does not implement any of these classes %s",
-                                name, o, Arrays.asList(classesImplements)));
-            }
-        }
-    }
-
-    @NoArgsConstructor
-    public static class SerdeValidator extends Validator {
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            if (o != null && o.equals(DEFAULT_SERDE)) return;
-            new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o, classLoader);
-        }
-    }
-
-    @NoArgsConstructor
-    public static class SchemaValidator extends Validator {
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            new ValidatorImpls.ImplementsClassValidator(Schema.class).validateField(name, o, classLoader);
-        }
-    }
-
-
-    /**
-     * validates each key and each value against the respective arrays of validators.
-     */
-    public static class MapEntryCustomValidator extends Validator {
-
-        private Class<?>[] keyValidators;
-        private Class<?>[] valueValidators;
-
-        public MapEntryCustomValidator(Map<String, Object> params) {
-            this.keyValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
-            this.valueValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
-        }
-
-        @SuppressWarnings("unchecked")
-        public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o,
-                                         ClassLoader classLoader)
-                throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
-            if (o == null) {
-                return;
-            }
-            //check if Map
-            SimpleTypeValidator.validateField(name, Map.class, o);
-            for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
-                for (Class<?> kv : keyValidators) {
-                    Object keyValidator = kv.getConstructor().newInstance();
-                    if (keyValidator instanceof Validator) {
-                        ((Validator) keyValidator).validateField(name + " Map key", entry.getKey(), classLoader);
-                    } else {
-                        log.warn(
-                                "validator: {} cannot be used in MapEntryCustomValidator to validate keys.  Individual entry validators must " +
-                                        "a instance of Validator class",
-                                kv.getName());
-                    }
-                }
-                for (Class<?> vv : valueValidators) {
-                    Object valueValidator = vv.getConstructor().newInstance();
-                    if (valueValidator instanceof Validator) {
-                        ((Validator) valueValidator).validateField(name + " Map value", entry.getValue(), classLoader);
-                    } else {
-                        log.warn(
-                                "validator: {} cannot be used in MapEntryCustomValidator to validate values.  Individual entry validators " +
-                                        "must a instance of Validator class",
-                                vv.getName());
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            try {
-                validateField(name, this.keyValidators, this.valueValidators, o, classLoader);
-            } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    @NoArgsConstructor
-    public static class StringValidator extends Validator {
-
-        private HashSet<String> acceptedValues = null;
-
-        public StringValidator(Map<String, Object> params) {
-
-            this.acceptedValues =
-                    new HashSet<String>(Arrays.asList((String[]) params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES)));
-
-            if (this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) {
-                this.acceptedValues = null;
-            }
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            SimpleTypeValidator.validateField(name, String.class, o);
-            if (this.acceptedValues != null) {
-                if (!this.acceptedValues.contains((String) o)) {
-                    throw new IllegalArgumentException(
-                            "Field " + name + " is not an accepted value. Value: " + o + " Accepted values: " + this.acceptedValues);
-                }
-            }
-        }
-    }
-    @NoArgsConstructor
-    public static class FunctionConfigValidator extends Validator {
-
-        private static void doJavaChecks(FunctionConfig functionConfig, String name, ClassLoader clsLoader) {
-            Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig, clsLoader);
-            // inputs use default schema, so there is no check needed there
-
-            // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
-            // implements SerDe class
-            if (functionConfig.getCustomSerdeInputs() != null) {
-                functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
-                    validateSerde(inputSerializer, typeArgs[0], name, clsLoader, true);
-                });
-            }
-
-            // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
-            // implements SerDe class
-            if (functionConfig.getCustomSchemaInputs() != null) {
-                functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> {
-                    validateSchema(schemaType, typeArgs[0], name, clsLoader, true);
-                });
-            }
-
-            // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
-            // implements Schema or SerDe classes
-
-            if (functionConfig.getInputSpecs() != null) {
-                functionConfig.getInputSpecs().forEach((topicName, conf) -> {
-                    // Need to make sure that one and only one of schema/serde is set
-                    if ((conf.getSchemaType() != null && !conf.getSchemaType().isEmpty())
-                            && (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty())) {
-                        throw new IllegalArgumentException(
-                                String.format("Only one of schemaType or serdeClassName should be set in inputSpec"));
-                    }
-                    if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
-                        validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader, true);
-                    }
-                    if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
-                        validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader, true);
-                    }
-                });
-            }
-
-            if (Void.class.equals(typeArgs[1])) {
-                return;
-            }
-
-            // One and only one of outputSchemaType and outputSerdeClassName should be set
-            if ((functionConfig.getOutputSerdeClassName() != null && !functionConfig.getOutputSerdeClassName().isEmpty())
-                    && (functionConfig.getOutputSchemaType()!= null && !functionConfig.getOutputSchemaType().isEmpty())) {
-                throw new IllegalArgumentException(
-                        String.format("Only one of outputSchemaType or outputSerdeClassName should be set"));
-            }
-
-            if (functionConfig.getOutputSchemaType() != null && !functionConfig.getOutputSchemaType().isEmpty()) {
-                validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader, false);
-            }
-
-            if (functionConfig.getOutputSerdeClassName() != null && !functionConfig.getOutputSerdeClassName().isEmpty()) {
-                validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader, false);
-            }
-
-        }
-
-        private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader,
-                                           boolean input) {
-            if (StringUtils.isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) {
-                // If it's empty, we use the default schema and no need to validate
-                // If it's built-in, no need to validate
-            } else {
-                try {
-                    new SchemaValidator().validateField(name, schemaType, clsLoader);
-                } catch (IllegalArgumentException ex) {
-                    throw new IllegalArgumentException(
-                            String.format("The input schema class %s does not not implement %s",
-                                    schemaType, Schema.class.getCanonicalName()));
-                }
-
-                validateSchemaType(schemaType, typeArg, clsLoader, input);
-            }
-        }
-
-        private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader,
-                                          boolean deser) {
-            if (StringUtils.isEmpty(inputSerializer)) return;
-            if (inputSerializer.equals(DEFAULT_SERDE)) return;
-            Class<?> serdeClass;
-            try {
-                serdeClass = loadClass(inputSerializer, clsLoader);
-            } catch (ClassNotFoundException e) {
-                throw new IllegalArgumentException(
-                        String.format("The input serialization/deserialization class %s does not exist",
-                                inputSerializer));
-            }
-
-            try {
-                new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer, clsLoader);
-            } catch (IllegalArgumentException ex) {
-                throw new IllegalArgumentException(
-                        String.format("The input serialization/deserialization class %s does not not implement %s",
-
-                                inputSerializer, SerDe.class.getCanonicalName()));
-            }
-
-            SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
-            if (serDe == null) {
-                throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
-                        inputSerializer));
-            }
-            Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-            // type inheritance information seems to be lost in generic type
-            // load the actual type class for verification
-            Class<?> fnInputClass;
-            Class<?> serdeInputClass;
-            try {
-                fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
-                serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
-            } catch (ClassNotFoundException e) {
-                throw new IllegalArgumentException("Failed to load type class", e);
-            }
-
-            if (deser) {
-                if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                    throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
-                }
-            } else {
-                if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
-                    throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
-                }
-            }
-        }
-
-        private static void doPythonChecks(FunctionConfig functionConfig, String name) {
-            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
-            }
-
-            if (functionConfig.getWindowConfig() != null) {
-                throw new IllegalArgumentException("There is currently no support windowing in python");
-            }
-
-            if (functionConfig.getMaxMessageRetries() >= 0) {
-                throw new IllegalArgumentException("Message retries not yet supported in python");
-            }
-        }
-
-        private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
-            if (inputTopics.contains(outputTopic)) {
-                throw new IllegalArgumentException(
-                        String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
-                                outputTopic));
-            }
-        }
-
-        private static void doCommonChecks(FunctionConfig functionConfig) {
-            Collection<String> allInputTopics = collectAllInputTopics(functionConfig);
-            if (allInputTopics.isEmpty()) {
-                throw new RuntimeException("No input topic(s) specified for the function");
-            }
-
-            // Ensure that topics aren't being used as both input and output
-            verifyNoTopicClash(allInputTopics, functionConfig.getOutput());
-
-            WindowConfig windowConfig = functionConfig.getWindowConfig();
-            if (windowConfig != null) {
-                // set auto ack to false since windowing framework is responsible
-                // for acking and not the function framework
-                if (functionConfig.isAutoAck() == true) {
-                    throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
-                }
-                functionConfig.setAutoAck(false);
-            }
-
-            if (functionConfig.getTimeoutMs() != null
-                    && functionConfig.getProcessingGuarantees() != null
-                    && functionConfig.getProcessingGuarantees() != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
-                throw new IllegalArgumentException("Message timeout can only be specified with processing guarantee is "
-                        + FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
-            }
-
-            if (functionConfig.getMaxMessageRetries() >= 0
-                    && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                throw new IllegalArgumentException("MaxMessageRetries and Effectively once don't gel well");
-            }
-            if (functionConfig.getMaxMessageRetries() < 0 && !StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
-                throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
-            }
-        }
-
-        private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {
-            List<String> retval = new LinkedList<>();
-            if (functionConfig.getInputs() != null) {
-                retval.addAll(functionConfig.getInputs());
-            }
-            if (functionConfig.getTopicsPattern() != null) {
-                retval.add(functionConfig.getTopicsPattern());
-            }
-            if (functionConfig.getCustomSerdeInputs() != null) {
-                retval.addAll(functionConfig.getCustomSerdeInputs().keySet());
-            }
-            if (functionConfig.getCustomSchemaInputs() != null) {
-                retval.addAll(functionConfig.getCustomSchemaInputs().keySet());
-            }
-            if (functionConfig.getInputSpecs() != null) {
-                retval.addAll(functionConfig.getInputSpecs().keySet());
-            }
-            return retval;
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            FunctionConfig functionConfig = (FunctionConfig) o;
-            doCommonChecks(functionConfig);
-            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-                if (classLoader != null) {
-                    doJavaChecks(functionConfig, name, classLoader);
-                }
-            } else {
-                doPythonChecks(functionConfig, name);
-            }
-        }
-    }
-
-    /**
-     * Validates each entry in a list against a list of custom Validators. Each validator in the list of validators must inherit or be an
-     * instance of Validator class
-     */
-    public static class ListEntryCustomValidator extends Validator {
-
-        private Class<?>[] entryValidators;
-
-        public ListEntryCustomValidator(Map<String, Object> params) {
-            this.entryValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
-        }
-
-        public static void validateField(String name, Class<?>[] validators, Object o, ClassLoader classLoader)
-                throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
-            if (o == null) {
-                return;
-            }
-            //check if iterable
-            SimpleTypeValidator.validateField(name, Iterable.class, o);
-            for (Object entry : (Iterable<?>) o) {
-                for (Class<?> validator : validators) {
-                    Object v = validator.getConstructor().newInstance();
-                    if (v instanceof Validator) {
-                        ((Validator) v).validateField(name + " list entry", entry, classLoader);
-                    } else {
-                        log.warn(
-                                "validator: {} cannot be used in ListEntryCustomValidator.  Individual entry validators must a instance of " +
-                                        "Validator class",
-                                validator.getName());
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            try {
-                validateField(name, this.entryValidators, o, classLoader);
-            } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    @NoArgsConstructor
-    public static class TopicNameValidator extends Validator {
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            if (o == null) {
-                return;
-            }
-            new StringValidator().validateField(name, o, classLoader);
-            String topic = (String) o;
-            if (!TopicName.isValid(topic)) {
-                throw new IllegalArgumentException(
-                        String.format("The topic name %s is invalid for field '%s'", topic, name));
-            }
-        }
-    }
-
-    public static class WindowConfigValidator extends Validator{
-
-        public static void validateWindowConfig(WindowConfig windowConfig) {
-            if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
-                throw new IllegalArgumentException("Window length is not specified");
-            }
-
-            if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
-                throw new IllegalArgumentException(
-                        "Window length for time and count are set! Please set one or the other.");
-            }
-
-            if (windowConfig.getWindowLengthCount() != null) {
-                if (windowConfig.getWindowLengthCount() <= 0) {
-                    throw new IllegalArgumentException(
-                            "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
-                }
-            }
-
-            if (windowConfig.getWindowLengthDurationMs() != null) {
-                if (windowConfig.getWindowLengthDurationMs() <= 0) {
-                    throw new IllegalArgumentException(
-                            "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
-                }
-            }
-
-            if (windowConfig.getSlidingIntervalCount() != null) {
-                if (windowConfig.getSlidingIntervalCount() <= 0) {
-                    throw new IllegalArgumentException(
-                            "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
-                }
-            }
-
-            if (windowConfig.getSlidingIntervalDurationMs() != null) {
-                if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
-                    throw new IllegalArgumentException(
-                            "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
-                }
-            }
-
-            if (windowConfig.getTimestampExtractorClassName() != null) {
-                if (windowConfig.getMaxLagMs() != null) {
-                    if (windowConfig.getMaxLagMs() < 0) {
-                        throw new IllegalArgumentException(
-                                "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
-                    }
-                }
-                if (windowConfig.getWatermarkEmitIntervalMs() != null) {
-                    if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
-                        throw new IllegalArgumentException(
-                                "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            if (o == null) {
-                return;
-            }
-            if (!(o instanceof WindowConfig)) {
-                throw new IllegalArgumentException(String.format("Field '%s' must be of WindowConfig type!", name));
-            }
-            WindowConfig windowConfig = (WindowConfig) o;
-            validateWindowConfig(windowConfig);
-        }
-    }
-
-    public static class SourceConfigValidator extends Validator {
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            SourceConfig sourceConfig = (SourceConfig) o;
-            if (classLoader == null) {
-                // This happens at the cli for builtin. There is no need to check this since
-                // the actual check will be done at serverside
-                return;
-            }
-
-            String sourceClassName;
-            try {
-                sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
-            } catch (IOException e1) {
-                throw new IllegalArgumentException("Failed to extract source class from archive", e1);
-            }
-
-
-            Class<?> typeArg = getSourceType(sourceClassName, classLoader);
-
-            // Only one of serdeClassName or schemaType should be set
-            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
-                throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
-            }
-
-            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
-                FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, classLoader, false);
-            }
-            if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
-                FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, classLoader, false);
-            }
-        }
-    }
-
-    public static class SinkConfigValidator extends Validator {
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            SinkConfig sinkConfig = (SinkConfig) o;
-            if (classLoader == null) {
-                // This happens at the cli for builtin. There is no need to check this since
-                // the actual check will be done at serverside
-                return;
-            }
-
-            // if function-pkg url is present eg: file://xyz.jar then admin-tool might not have access of the file at
-            // the same location so, need to rely on server side validation.
-            if (Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive())) {
-                return;
-            }
-
-            // make we sure we have one source of input
-            if (collectAllInputTopics(sinkConfig).isEmpty()) {
-                throw new IllegalArgumentException("Must specify at least one topic of input via topicToSerdeClassName, " +
-                        "topicsPattern, topicToSchemaType or inputSpecs");
-            }
-
-
-            String sinkClassName;
-            try {
-                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
-            } catch (IOException e1) {
-                throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
-            }
-            Class<?> typeArg = getSinkType(sinkClassName, classLoader);
-
-            if (sinkConfig.getTopicToSerdeClassName() != null) {
-                sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
-                    FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, classLoader, true);
-                });
-            }
-
-            if (sinkConfig.getTopicToSchemaType() != null) {
-                sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
-                    FunctionConfigValidator.validateSchema(schemaType, typeArg, name, classLoader, true);
-                });
-            }
-
-            // topicsPattern does not need checks
-
-            if (sinkConfig.getInputSpecs() != null) {
-                sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
-                    // Only one is set
-                    if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()
-                            && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
-                        throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
-                    }
-                    if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
-                        FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, classLoader, true);
-                    }
-                    if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
-                        FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, classLoader, true);
-                    }
-                });
-            }
-        }
-
-        private static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
-            List<String> retval = new LinkedList<>();
-            if (sinkConfig.getInputs() != null) {
-                retval.addAll(sinkConfig.getInputs());
-            }
-            if (sinkConfig.getTopicToSerdeClassName() != null) {
-                retval.addAll(sinkConfig.getTopicToSerdeClassName().keySet());
-            }
-            if (sinkConfig.getTopicsPattern() != null) {
-                retval.add(sinkConfig.getTopicsPattern());
-            }
-            if (sinkConfig.getTopicToSchemaType() != null) {
-                retval.addAll(sinkConfig.getTopicToSchemaType().keySet());
-            }
-            if (sinkConfig.getInputSpecs() != null) {
-                retval.addAll(sinkConfig.getInputSpecs().keySet());
-            }
-            return retval;
-        }
-    }
-
-    public static class FileValidator extends Validator {
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            if (o == null) {
-                return;
-            }
-            new StringValidator().validateField(name, o, classLoader);
-
-            String path = (String) o;
-
-            if(!Utils.isFunctionPackageUrlSupported(path)) {
-                // check file existence if path is not url and local path
-                if (!path.startsWith(Utils.BUILTIN) && !fileExists(path)) {
-                    throw new IllegalArgumentException
-                            (String.format("File %s specified in field '%s' does not exist", path, name));
-                }
-            }
-        }
-    }
-
-    /**
-     * Validates basic types.
-     */
-    public static class SimpleTypeValidator extends Validator {
-
-        private Class<?> type;
-
-        public SimpleTypeValidator(Map<String, Object> params) {
-            this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
-        }
-
-        public static void validateField(String name, Class<?> type, Object o) {
-            if (o == null) {
-                return;
-            }
-            if (type.isInstance(o)) {
-                return;
-            }
-            throw new IllegalArgumentException(
-                    "Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass());
-        }
-
-        @Override
-        public void validateField(String name, Object o, ClassLoader classLoader) {
-            validateField(name, this.type, o);
-        }
-    }
-
-    private static Class<?> loadClass(String className, ClassLoader classLoader) throws ClassNotFoundException {
-        Class<?> objectClass;
-        try {
-            objectClass = Class.forName(className);
-        } catch (ClassNotFoundException e) {
-            if (classLoader != null) {
-                objectClass = classLoader.loadClass(className);
-            } else {
-                throw e;
-            }
-        }
-        return objectClass;
-    }
-
-
-    private static SchemaType getBuiltinSchemaType(String schemaTypeOrClassName) {
-        try {
-            return SchemaType.valueOf(schemaTypeOrClassName.toUpperCase());
-        } catch (IllegalArgumentException e) {
-            // schemaType is not referring to builtin type
-            return null;
-        }
-    }
-
-    private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader, boolean input) {
-        validateCustomSchemaType(scheamType, typeArg, clsLoader, input);
-    }
-
-    private static void validateSerDeType(String serdeClassName, Class<?> typeArg, ClassLoader clsLoader) {
-        SerDe<?> serDe = (SerDe<?>) Reflections.createInstance(serdeClassName, clsLoader);
-        if (serDe == null) {
-            throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
-                    serdeClassName));
-        }
-        Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-        // type inheritance information seems to be lost in generic type
-        // load the actual type class for verification
-        Class<?> fnInputClass;
-        Class<?> serdeInputClass;
-        try {
-            fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
-            serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
-        } catch (ClassNotFoundException e) {
-            throw new IllegalArgumentException("Failed to load type class", e);
-        }
-
-        if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-            throw new IllegalArgumentException(
-                    "Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
-        }
-    }
-
-    private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader,
-                                                 boolean input) {
-        Schema<?> schema = (Schema<?>) Reflections.createInstance(schemaClassName, clsLoader);
-        if (schema == null) {
-            throw new IllegalArgumentException(String.format("The Schema class %s does not exist",
-                    schemaClassName));
-        }
-        Class<?>[] schemaTypes = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
-
-        // type inheritance information seems to be lost in generic type
-        // load the actual type class for verification
-        Class<?> fnInputClass;
-        Class<?> schemaInputClass;
-        try {
-            fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
-            schemaInputClass = Class.forName(schemaTypes[0].getName(), true, clsLoader);
-        } catch (ClassNotFoundException e) {
-            throw new IllegalArgumentException("Failed to load type class", e);
-        }
-
-        if (input) {
-            if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
-                throw new IllegalArgumentException(
-                        "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
-            }
-        } else {
-            if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
-                throw new IllegalArgumentException(
-                        "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
new file mode 100644
index 0000000000..72dd26eabf
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link Exceptions}.
+ */
+public class UtilsTest {
+
+    @Test
+    public void testValidateLocalFileUrl() throws Exception {
+        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        try {
+            // eg: fileLocation : /dir/fileName.jar (invalid)
+            Utils.extractClassLoader(fileLocation);
+            Assert.fail("should fail with invalid url: without protocol");
+        } catch (IllegalArgumentException ie) {
+            // Ok.. expected exception
+        }
+        String fileLocationWithProtocol = "file://" + fileLocation;
+        // eg: fileLocation : file:///dir/fileName.jar (valid)
+        Utils.extractClassLoader(fileLocationWithProtocol);
+        // eg: fileLocation : file:/dir/fileName.jar (valid)
+        fileLocationWithProtocol = "file:" + fileLocation;
+        Utils.extractClassLoader(fileLocationWithProtocol);
+    }
+
+    @Test
+    public void testValidateHttpFileUrl() throws Exception {
+
+        String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
+        Utils.extractClassLoader(jarHttpUrl);
+
+        jarHttpUrl = "http://_invalidurl_.com";
+        try {
+            // eg: fileLocation : /dir/fileName.jar (invalid)
+            Utils.extractClassLoader(jarHttpUrl);
+            Assert.fail("should fail with invalid url: without protocol");
+        } catch (Exception ie) {
+            // Ok.. expected exception
+        }
+    }
+}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/WindowConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/WindowConfigUtilsTest.java
new file mode 100644
index 0000000000..7b9fc6857a
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/WindowConfigUtilsTest.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.testng.Assert.fail;
+
+/**
+ * Unit test of {@link Exceptions}.
+ */
+public class WindowConfigUtilsTest {
+
+    @Test
+    public void testSettingSlidingCountWindow() throws Exception {
+        final Object[][] args = new Object[][]{
+                {-1, 10},
+                {10, -1},
+                {0, 10},
+                {10, 0},
+                {0, 0},
+                {-1, -1},
+                {5, 10},
+                {1, 1},
+                {10, 5},
+                {100, 10},
+                {100, 100},
+                {200, 100},
+                {500, 100},
+                {null, null},
+                {null, 1},
+                {1, null},
+                {null, -1},
+                {-1, null}
+        };
+
+        for (Object[] arg : args) {
+            Object arg0 = arg[0];
+            Object arg1 = arg[1];
+            try {
+
+                Integer windowLengthCount = null;
+                if (arg0 != null) {
+                    windowLengthCount = (Integer) arg0;
+                }
+                Integer slidingIntervalCount = null;
+                if (arg1 != null) {
+                    slidingIntervalCount = (Integer) arg1;
+                }
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setWindowLengthCount(windowLengthCount);
+                windowConfig.setSlidingIntervalCount(slidingIntervalCount);
+
+                WindowConfigUtils.validate(windowConfig);
+
+                if (arg0 == null) {
+                    fail(String.format("Window length cannot be null -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+                if ((Integer) arg0 <= 0) {
+                    fail(String.format("Window length cannot be zero or less -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+                if (arg1 != null && (Integer) arg1 <= 0) {
+                    fail(String.format("Sliding interval length cannot be zero or less -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && arg1 != null && (Integer) arg0 > 0 && (Integer) arg1 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- windowLengthCount: %s "
+                            + "slidingIntervalCount: %s", e.getMessage(), arg0, arg1));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingSlidingTimeWindow() throws Exception {
+        final Object[][] args = new Object[][]{
+                {-1L, 10L},
+                {10L, -1L},
+                {0L, 10L},
+                {10L, 0L},
+                {0L, 0L},
+                {-1L, -1L},
+                {5L, 10L},
+                {1L, 1L},
+                {10L, 5L},
+                {100L, 10L},
+                {100L, 100L},
+                {200L, 100L},
+                {500L, 100L},
+                {null, null},
+                {null, 1L},
+                {1L, null},
+                {null, -1L},
+                {-1L, null}
+        };
+
+        for (Object[] arg : args) {
+            Object arg0 = arg[0];
+            Object arg1 = arg[1];
+            try {
+                Long windowLengthDuration = null;
+                if (arg0 != null) {
+                    windowLengthDuration = (Long) arg0;
+                }
+                Long slidingIntervalDuration = null;
+                if (arg1 != null) {
+                    slidingIntervalDuration = (Long) arg1;
+                }
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setWindowLengthDurationMs(windowLengthDuration);
+                windowConfig.setSlidingIntervalDurationMs(slidingIntervalDuration);
+                WindowConfigUtils.validate(windowConfig);
+
+                if (arg0 == null) {
+                    fail(String.format("Window length cannot be null -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+                if ((Long) arg0 <= 0) {
+                    fail(String.format("Window length cannot be zero or less -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+                if (arg1 != null && (Long) arg1 <= 0) {
+                    fail(String.format("Sliding interval length cannot be zero or less -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && arg1 != null && (Long) arg0 > 0 && (Long) arg1 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- windowLengthDuration: %s "
+                            + "slidingIntervalDuration: %s", e.getMessage(), arg0, arg1));
+                }
+            }
+        }
+    }
+
+
+    @Test
+    public void testSettingTumblingCountWindow() throws Exception {
+        final Object[] args = new Object[]{-1, 0, 1, 2, 5, 10, null};
+
+        for (Object arg : args) {
+            Object arg0 = arg;
+            try {
+
+                Integer windowLengthCount = null;
+                if (arg0 != null) {
+                    windowLengthCount = (Integer) arg0;
+                }
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setWindowLengthCount(windowLengthCount);
+                WindowConfigUtils.validate(windowConfig);
+
+                if (arg0 == null) {
+                    fail(String.format("Window length cannot be null -- windowLengthCount: %s", arg0));
+                }
+                if ((Integer) arg0 <= 0) {
+                    fail(String.format("Window length cannot be zero or less -- windowLengthCount: %s",
+                            arg0));
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && (Integer) arg0 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- windowLengthCount: %s", e
+                            .getMessage(), arg0));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingTumblingTimeWindow() throws Exception {
+        final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+        for (Object arg : args) {
+            Object arg0 = arg;
+            try {
+
+                Long windowLengthDuration = null;
+                if (arg0 != null) {
+                    windowLengthDuration = (Long) arg0;
+                }
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setWindowLengthDurationMs(windowLengthDuration);
+                WindowConfigUtils.validate(windowConfig);
+
+                if (arg0 == null) {
+                    fail(String.format("Window count duration cannot be null -- windowLengthDuration: %s",
+                            arg0));
+                }
+                if ((Long) arg0 <= 0) {
+                    fail(String.format("Window length cannot be zero or less -- windowLengthDuration: %s",
+                            arg0));
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && (Long) arg0 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- windowLengthDuration: %s", e
+                            .getMessage(), arg0));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingLagTime() throws Exception {
+        final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+        for (Object arg : args) {
+            Object arg0 = arg;
+            try {
+
+                Long maxLagMs = null;
+                if (arg0 != null) {
+                    maxLagMs = (Long) arg0;
+                }
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setWindowLengthCount(1);
+                windowConfig.setSlidingIntervalCount(1);
+                windowConfig.setMaxLagMs(maxLagMs);
+                windowConfig.setTimestampExtractorClassName("SomeClass");
+                WindowConfigUtils.validate(windowConfig);
+
+                if(arg0 != null && (Long) arg0 < 0) {
+                    fail(String.format("Window lag cannot be less than zero -- lagTime: %s", arg0));
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && (Long) arg0 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- lagTime: %s",
+                            e.getMessage(), arg0));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingWaterMarkInterval() throws Exception {
+        final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+        for (Object arg : args) {
+            Object arg0 = arg;
+            try {
+                Long watermarkEmitInterval = null;
+                if (arg0 != null) {
+                    watermarkEmitInterval = (Long) arg0;
+                }
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setWindowLengthCount(1);
+                windowConfig.setSlidingIntervalCount(1);
+                windowConfig.setWatermarkEmitIntervalMs(watermarkEmitInterval);
+                windowConfig.setTimestampExtractorClassName("SomeClass");
+                WindowConfigUtils.validate(windowConfig);
+
+                if (arg0 != null && (Long) arg0 <= 0) {
+                    fail(String.format("Watermark interval cannot be zero or less -- watermarkInterval: "
+                            + "%s", arg0));
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && (Long) arg0 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- watermarkInterval: %s", e
+                            .getMessage(), arg0));
+                }
+            }
+        }
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index f9359916a8..ebb68d54e7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -18,22 +18,16 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
-import java.net.MalformedURLException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
-import java.util.Collections;
 import java.util.UUID;
 import lombok.extern.slf4j.Slf4j;
 
@@ -49,7 +43,6 @@
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
 import org.apache.zookeeper.KeeperException.Code;
@@ -62,25 +55,6 @@
 
     private Utils(){}
 
-    public static Object getObject(byte[] byteArr) throws IOException, ClassNotFoundException {
-        Object obj = null;
-        ByteArrayInputStream bis = null;
-        ObjectInputStream ois = null;
-        try {
-            bis = new ByteArrayInputStream(byteArr);
-            ois = new ObjectInputStream(bis);
-            obj = ois.readObject();
-        } finally {
-            if (bis != null) {
-                bis.close();
-            }
-            if (ois != null) {
-                ois.close();
-            }
-        }
-        return obj;
-    }
-
     public static byte[] toByteArray(Object obj) throws IOException {
         byte[] bytes = null;
         ByteArrayOutputStream bos = null;
@@ -136,61 +110,6 @@ public static void uploadToBookeeper(Namespace dlogNamespace,
             }
         }
     }
-
-    public static ClassLoader validateFileUrl(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException {
-        if (destPkgUrl.startsWith(FILE)) {
-            URL url = new URL(destPkgUrl);
-            File file = new File(url.toURI());
-            if (!file.exists()) {
-                throw new IOException(destPkgUrl + " does not exists locally");
-            }
-            try {
-                return Reflections.loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new IllegalArgumentException(
-                        "Corrupt User PackageFile " + file + " with error " + e.getMessage());
-            }
-        } else if (destPkgUrl.startsWith("http")) {
-            URL website = new URL(destPkgUrl);
-            File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString());
-            ReadableByteChannel rbc = Channels.newChannel(website.openStream());
-            try (FileOutputStream fos = new FileOutputStream(tempFile)) {
-                fos.getChannel().transferFrom(rbc, 0, 10);
-            }
-            if (tempFile.exists()) {
-                tempFile.delete();
-            }
-            return null;
-        } else {
-            throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
-        }
-    }
-
-    public static NarClassLoader extractNarClassloader(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException {
-        if (destPkgUrl.startsWith(FILE)) {
-            URL url = new URL(destPkgUrl);
-            File file = new File(url.toURI());
-            if (!file.exists()) {
-                throw new IllegalArgumentException(destPkgUrl + " does not exists locally");
-            }
-            try {
-                return NarClassLoader.getFromArchive(file, Collections.emptySet());
-            } catch (MalformedURLException e) {
-                throw new IllegalArgumentException(
-                        "Corrupt User PackageFile " + file + " with error " + e.getMessage());
-            }
-        } else if (destPkgUrl.startsWith("http")) {
-            URL website = new URL(destPkgUrl);
-            File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString());
-            if (!tempFile.exists()) {
-                throw new IllegalArgumentException("Could not create local file " + tempFile);
-            }
-            tempFile.deleteOnExit();
-            return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
-        } else {
-            throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
-        }
-    }
     
     public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException {
         URL website = new URL(destPkgUrl);
@@ -198,13 +117,6 @@ public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outpu
         outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
     }
 
-    public static File downloadFromHttpUrl(String destPkgUrl, String fileName) throws IOException {
-        File tempPkgFile = File.createTempFile(fileName, "function");
-        tempPkgFile.deleteOnExit();
-        downloadFromHttpUrl(destPkgUrl, new FileOutputStream(tempPkgFile));
-        return tempPkgFile;
-    }
-
     public static void downloadFromBookkeeper(Namespace namespace,
                                                  OutputStream outputStream,
                                                  String packagePath) throws IOException {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0eebe794cf..d965c3ce94 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -23,10 +23,6 @@
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.pulsar.functions.utils.Reflections.createInstance;
-import static org.apache.pulsar.functions.utils.Reflections.loadJar;
-import static org.apache.pulsar.functions.utils.Utils.FILE;
-import static org.apache.pulsar.functions.utils.Utils.HTTP;
-import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
 
 import com.google.gson.Gson;
 
@@ -34,7 +30,6 @@
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import java.io.*;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -64,6 +59,7 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.join;
+import static org.apache.pulsar.functions.utils.Utils.*;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -86,7 +82,6 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.*;
-import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.Utils;
@@ -1040,26 +1035,41 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
 
         if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            ClassLoader clsLoader = null;
-            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-                clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
-            }
-            if (functionConfig.getRuntime() == null) {
-                throw new IllegalArgumentException("Function Runtime no specified");
-            }
-            ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader);
+            ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, uploadedInputStreamAsFile);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
         }
         if (componentType.equals(SOURCE)) {
+            Path archivePath = null;
             SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
-            NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true);
-            ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
+            if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
+                String builtinArchive = sourceConfig.getArchive();
+                if (builtinArchive.startsWith(BUILTIN)) {
+                    builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+                }
+                try {
+                    archivePath = this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
+                }
+            }
+            NarClassLoader clsLoader = SourceConfigUtils.validate(sourceConfig, archivePath, functionPkgUrl, uploadedInputStreamAsFile);
             return SourceConfigUtils.convert(sourceConfig, clsLoader);
         }
         if (componentType.equals(SINK)) {
+            Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
-            NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false);
-            ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
+            if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
+                String builtinArchive = sinkConfig.getArchive();
+                if (builtinArchive.startsWith(BUILTIN)) {
+                    builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+                }
+                try {
+                    archivePath = this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
+                }
+            }
+            NarClassLoader clsLoader = SinkConfigUtils.validate(sinkConfig, archivePath, functionPkgUrl, uploadedInputStreamAsFile);
             return SinkConfigUtils.convert(sinkConfig, clsLoader);
         }
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
@@ -1070,7 +1080,19 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
         }
         ClassLoader clsLoader = null;
         if (functionDetailsBuilder.getRuntime() == FunctionDetails.Runtime.JAVA) {
-            clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
+            if (!isEmpty(functionPkgUrl)) {
+                try {
+                    clsLoader = extractClassLoader(functionPkgUrl);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Corrupted Jar file", e);
+                }
+            } else {
+                try {
+                    clsLoader = loadJar(uploadedInputStreamAsFile);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Corrupted Jar file", e);
+                }
+            }
         }
         validateFunctionClassTypes(clsLoader, functionDetailsBuilder);
 
@@ -1107,73 +1129,6 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
         return functionDetails;
     }
 
-    private ClassLoader extractClassLoader(String functionPkgUrl, File uploadedInputStreamAsFile) throws URISyntaxException, IOException {
-        if (isNotBlank(functionPkgUrl)) {
-            return Utils.validateFileUrl(functionPkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
-        } else if (uploadedInputStreamAsFile != null) {
-            try {
-                return loadJar(uploadedInputStreamAsFile);
-            } catch (MalformedURLException e) {
-                throw new IllegalArgumentException("Corrupted Jar File", e);
-            }
-        } else {
-            return null;
-        }
-    }
-
-    public NarClassLoader extractNarClassLoader(String archive, String pkgUrl, File uploadedInputStreamFileName,
-                                                 boolean isSource) {
-        if (!StringUtils.isEmpty(archive)) {
-            String builtinArchive = archive;
-            if (archive.startsWith(org.apache.pulsar.functions.utils.Utils.BUILTIN)) {
-                builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
-            }
-            if (isSource) {
-                Path path;
-                try {
-                    path = this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(String.format("No Source archive %s found", archive));
-                }
-                try {
-                    return NarClassLoader.getFromArchive(path.toFile(),
-                            Collections.emptySet());
-                } catch (IOException e) {
-                    throw new IllegalArgumentException(String.format("The source %s is corrupted", archive));
-                }
-            } else {
-                Path path;
-                try {
-                    path = this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(String.format("No Sink archive %s found", archive));
-                }
-                try {
-                    return NarClassLoader.getFromArchive(path.toFile(),
-                            Collections.emptySet());
-                } catch (IOException e) {
-                    throw new IllegalArgumentException(String.format("The sink %s is corrupted", archive));
-                }
-            }
-        }
-        if (!StringUtils.isEmpty(pkgUrl)) {
-            try {
-                return Utils.extractNarClassloader(pkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
-            } catch (Exception e) {
-                throw new IllegalArgumentException(e.getMessage());
-            }
-        }
-        if (uploadedInputStreamFileName != null) {
-            try {
-                return NarClassLoader.getFromArchive(uploadedInputStreamFileName,
-                        Collections.emptySet());
-            } catch (IOException e) {
-                throw new IllegalArgumentException(e.getMessage());
-            }
-        }
-        return null;
-    }
-
     private void validateFunctionClassTypes(ClassLoader classLoader, FunctionDetails.Builder functionDetailsBuilder) {
 
         // validate only if classLoader is provided
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java
index b2a92f4252..77f95d0ffa 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java
@@ -22,7 +22,6 @@
 import java.io.FileOutputStream;
 import java.util.UUID;
 
-import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -31,42 +30,6 @@
  */
 public class UtilsTest {
 
-    @Test
-    public void testValidateLocalFileUrl() throws Exception {
-        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        String testDir = UtilsTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        try {
-            // eg: fileLocation : /dir/fileName.jar (invalid)
-            Utils.validateFileUrl(fileLocation, testDir);
-            Assert.fail("should fail with invalid url: without protocol");
-        } catch (IllegalArgumentException ie) {
-            // Ok.. expected exception
-        }
-        String fileLocationWithProtocol = "file://" + fileLocation;
-        // eg: fileLocation : file:///dir/fileName.jar (valid)
-        Utils.validateFileUrl(fileLocationWithProtocol, testDir);
-        // eg: fileLocation : file:/dir/fileName.jar (valid)
-        fileLocationWithProtocol = "file:" + fileLocation;
-        Utils.validateFileUrl(fileLocationWithProtocol, testDir);
-    }
-
-    @Test
-    public void testValidateHttpFileUrl() throws Exception {
-
-        String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
-        String testDir = UtilsTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        Utils.validateFileUrl(jarHttpUrl, testDir);
-
-        jarHttpUrl = "http://_invalidurl_.com";
-        try {
-            // eg: fileLocation : /dir/fileName.jar (invalid)
-            Utils.validateFileUrl(jarHttpUrl, testDir);
-            Assert.fail("should fail with invalid url: without protocol");
-        } catch (Exception ie) {
-            // Ok.. expected exception
-        }
-    }
-
     @Test
     public void testDownloadFile() throws Exception {
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 460d0509ff..296dba2654 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -261,7 +261,7 @@ public void testRegisterFunctionMissingClassName() throws IOException {
                 outputSerdeClassName,
             null,
             parallelism,
-                "Field 'className' cannot be null!");
+                "Function classname cannot be null");
     }
 
     private void testRegisterFunctionMissingArguments(
@@ -546,7 +546,7 @@ public void testUpdateFunctionMissingClassName() throws IOException {
                 outputSerdeClassName,
             null,
             parallelism,
-                "Field 'className' cannot be null!");
+                "Function classname cannot be null");
     }
 
     private void testUpdateFunctionMissingArguments(
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 315f56d2f0..332534907c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -146,9 +146,9 @@ public void setup() throws Exception {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
         mockStatic(SinkConfigUtils.class);
         when(SinkConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+        when(SinkConfigUtils.validate(any(), any(), any(), any())).thenReturn(null);
         Mockito.doReturn("Sink").when(this.resource).calculateSubjectType(any());
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index eee684f4d6..d11cf7ee4d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -135,9 +135,9 @@ public void setup() throws Exception {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
         mockStatic(SourceConfigUtils.class);
         when(SourceConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+        when(SourceConfigUtils.validate(any(), any(), any(), any())).thenReturn(null);
         Mockito.doReturn("Source").when(this.resource).calculateSubjectType(any());
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services