You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/30 19:51:13 UTC

[GitHub] jerrypeng closed pull request #1859: improving config validation

jerrypeng closed pull request #1859: improving config validation
URL: https://github.com/apache/incubator-pulsar/pull/1859
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index de4ab40ca8..1cc239db63 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -86,7 +86,12 @@ public IObjectFactory getObjectFactory() {
     private Functions functions;
     private CmdFunctions cmd;
 
-    public class DummyFunction implements Function<String, String> {
+    public static class DummyFunction implements Function<String, String> {
+
+        public DummyFunction() {
+
+        }
+
         @Override
         public String process(String input, Context context) throws Exception {
             return null;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index d565568602..94e17ad17b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -18,14 +18,8 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.isNull;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,31 +29,12 @@
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -70,10 +45,7 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
@@ -87,10 +59,33 @@
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
 import org.apache.pulsar.functions.windowing.WindowUtils;
 
-import net.jodah.typetools.TypeResolver;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -228,7 +223,7 @@ void processArguments() throws Exception {
         @Parameter(names = "--userConfig", description = "User-defined config key/values")
         protected String userConfigString;
         @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)")
-        protected String parallelism;
+        protected Integer parallelism;
         @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)")
         protected Double cpu;
         @Parameter(names = "--ram", description = "The ram in bytes that need to be allocated per function instance(applicable only to process/docker runtime)")
@@ -275,19 +270,14 @@ void processArguments() throws Exception {
 
             if (null != inputs) {
                 List<String> inputTopics = Arrays.asList(inputs.split(","));
-                inputTopics.forEach(this::validateTopicName);
                 functionConfig.setInputs(inputTopics);
             }
             if (null != customSerdeInputString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
-                customSerdeInputMap.forEach((topic, serde) -> {
-                    validateTopicName(topic);
-                });
                 functionConfig.setCustomSerdeInputs(customSerdeInputMap);
             }
             if (null != output) {
-                validateTopicName(output);
                 functionConfig.setOutput(output);
             }
             if (null != logTopic) {
@@ -320,37 +310,12 @@ void processArguments() throws Exception {
                 functionConfig.setUserConfig(new HashMap<>());
             }
 
-            if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) {
-                throw new RuntimeException("No input topic(s) specified for the function");
-            }
-
-            // Ensure that topics aren't being used as both input and output
-            verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput());
-
-            if (parallelism == null) {
-                if (functionConfig.getParallelism() == 0) {
-                    functionConfig.setParallelism(1);
-                }
-            } else {
-                int num = Integer.parseInt(parallelism);
-                if (num <= 0) {
-                    throw new IllegalArgumentException("The parallelism factor (the number of instances) for the function must be positive");
-                }
-                functionConfig.setParallelism(num);
+            if (parallelism != null) {
+                functionConfig.setParallelism(parallelism);
             }
 
-            com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0, "The cpu allocation for the function must be positive");
-            com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0, "The ram allocation for the function must be positive");
-            com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0, "The disk allocation for the function must be positive");
             functionConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
 
-            if (functionConfig.getSubscriptionType() != null
-                    && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER
-                    && functionConfig.getProcessingGuarantees() != null
-                    && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type");
-            }
-
             // window configs
             WindowConfig windowConfig = functionConfig.getWindowConfig();
             if (null != windowLengthCount) {
@@ -377,15 +342,7 @@ void processArguments() throws Exception {
                 }
                 windowConfig.setSlidingIntervalDurationMs(slidingIntervalDurationMs);
             }
-            if (windowConfig != null) {
-                WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);
-                // set auto ack to false since windowing framework is responsible
-                // for acking and not the function framework
-                if (autoAck != null && autoAck == true) {
-                    throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
-                }
-                functionConfig.setAutoAck(false);
-            }
+
             functionConfig.setWindowConfig(windowConfig);
 
             if  (null != autoAck) {
@@ -394,188 +351,43 @@ void processArguments() throws Exception {
                 functionConfig.setAutoAck(true);
             }
 
-            inferMissingArguments(functionConfig);
 
             if (null != jarFile) {
-                doJavaSubmitChecks(functionConfig);
                 functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
                 userCodeFile = jarFile;
             } else if (null != pyFile) {
-                doPythonSubmitChecks(functionConfig);
                 functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
                 userCodeFile = pyFile;
             } else {
                 throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function");
             }
-        }
 
-        private Class<?>[] getFunctionTypes(File file, FunctionConfig functionConfig) {
-            assertClassExistsInJar(file);
-
-            Object userClass = Reflections.createInstance(functionConfig.getClassName(), file);
-            Class<?>[] typeArgs;
-            // if window function
-            if (functionConfig.getWindowConfig() != null) {
-                java.util.function.Function function = (java.util.function.Function) userClass;
-                if (function == null) {
-                    throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s",
-                            functionConfig.getClassName(), jarFile));
-                }
-                typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
-                if (!typeArgs[0].equals(Collection.class)) {
-                    throw new IllegalArgumentException("Window function must take a collection as input");
-                }
-                Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
-                Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
-                Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
-                typeArgs[0] = (Class<?>) actualInputType;
-            } else {
-                if (userClass instanceof Function) {
-                    Function pulsarFunction = (Function) userClass;
-                    if (pulsarFunction == null) {
-                        throw new IllegalArgumentException(String.format("The Pulsar function class %s could not be instantiated from jar %s",
-                                functionConfig.getClassName(), jarFile));
-                    }
-                    typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
-                } else {
-                    java.util.function.Function function = (java.util.function.Function) userClass;
-                    if (function == null) {
-                        throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s",
-                                functionConfig.getClassName(), jarFile));
-                    }
-                    typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
-                }
-            }
+            // infer default vaues
+            inferMissingArguments(functionConfig);
 
-            return typeArgs;
+            // check if function configs are valid
+            validateFunctionConfigs(functionConfig);
         }
 
-        private void assertClassExistsInJar(File file) {
-            if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) {
-                throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
-                        functionConfig.getClassName(), jarFile));
-            } else if (!Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), Function.class)
-                    && !Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), java.util.function.Function.class)) {
-                throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function",
-                        functionConfig.getClassName(), jarFile));
-            }
-        }
+        private void validateFunctionConfigs(FunctionConfig functionConfig) {
 
-        private void doJavaSubmitChecks(FunctionConfig functionConfig) {
-            if (isNull(functionConfig.getClassName())) {
-                throw new IllegalArgumentException("You supplied a jar file but no main class");
-            }
-
-            File file = new File(jarFile);
-            ClassLoader userJarLoader;
-            try {
-                userJarLoader = Reflections.loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new RuntimeException("Failed to load user jar " + file, e);
-            }
-            Class<?>[] typeArgs = getFunctionTypes(file, functionConfig);
-            // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
-            // implements SerDe class
-            functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
-                if (!Reflections.classExists(inputSerializer)
-                        && !Reflections.classExistsInJar(new File(jarFile), inputSerializer)) {
-                    throw new IllegalArgumentException(
-                            String.format("The input serialization/deserialization class %s does not exist",
-                                    inputSerializer));
-                } else if (Reflections.classExists(inputSerializer)) {
-                    if (!Reflections.classImplementsIface(inputSerializer, SerDe.class)) {
-                        throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not not implement %s",
-                                inputSerializer, SerDe.class.getCanonicalName()));
-                    }
-                } else if (Reflections.classExistsInJar(new File(jarFile), inputSerializer)) {
-                    if (!Reflections.classInJarImplementsIface(new File(jarFile), inputSerializer, SerDe.class)) {
-                        throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not not implement %s",
-                                inputSerializer, SerDe.class.getCanonicalName()));
-                    }
-                }
-                if (inputSerializer.equals(DefaultSerDe.class.getName())) {
-                    if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
-                        throw new RuntimeException("The default Serializer does not support type " + typeArgs[0]);
-                    }
-                } else {
-                    SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, file);
-                    if (serDe == null) {
-                        throw new IllegalArgumentException(String.format("The SerDe class %s does not exist in jar %s",
-                                inputSerializer, jarFile));
-                    }
-                    Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-                    // type inheritance information seems to be lost in generic type
-                    // load the actual type class for verification
-                    Class<?> fnInputClass;
-                    Class<?> serdeInputClass;
-                    try {
-                        fnInputClass = Class.forName(typeArgs[0].getName(), true, userJarLoader);
-                        serdeInputClass = Class.forName(serDeTypes[0].getName(), true, userJarLoader);
-                    } catch (ClassNotFoundException e) {
-                        throw new RuntimeException("Failed to load type class", e);
-                    }
-
-                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                        throw new RuntimeException("Serializer type mismatch " + typeArgs[0] + " vs " + serDeTypes[0]);
-                    }
-                }
-            });
-            functionConfig.getInputs().forEach((topicName) -> {
-                if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
-                    throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]);
-                }
-            });
-            if (!Void.class.equals(typeArgs[1])) {
-                if (functionConfig.getOutputSerdeClassName() == null
-                        || functionConfig.getOutputSerdeClassName().isEmpty()
-                        || functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
-                    if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
-                        throw new RuntimeException("Default Serializer does not support type " + typeArgs[1]);
-                    }
-                } else {
-                    SerDe serDe = (SerDe) Reflections.createInstance(functionConfig.getOutputSerdeClassName(), file);
-                    if (serDe == null) {
-                        throw new IllegalArgumentException(String.format("SerDe class %s does not exist in jar %s",
-                                functionConfig.getOutputSerdeClassName(), jarFile));
-                    }
-                    Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-                    // type inheritance information seems to be lost in generic type
-                    // load the actual type class for verification
-                    Class<?> fnOutputClass;
-                    Class<?> serdeOutputClass;
-                    try {
-                        fnOutputClass = Class.forName(typeArgs[1].getName(), true, userJarLoader);
-                        serdeOutputClass = Class.forName(serDeTypes[0].getName(), true, userJarLoader);
-                    } catch (ClassNotFoundException e) {
-                        throw new RuntimeException("Failed to load type class", e);
-                    }
-
-                    if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) {
-                        throw new RuntimeException("Serializer type mismatch " + typeArgs[1] + " vs " + serDeTypes[0]);
-                    }
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                File file = new File(jarFile);
+                ClassLoader userJarLoader;
+                try {
+                    userJarLoader = Reflections.loadJar(file);
+                } catch (MalformedURLException e) {
+                    throw new RuntimeException("Failed to load user jar " + file, e);
                 }
-            }
-        }
-
-        private void doPythonSubmitChecks(FunctionConfig functionConfig) {
-            if (functionConfig.getClassName() == null) {
-                throw new IllegalArgumentException("You specified a Python file but no main class name");
-            }
-
-            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
+                // make sure the function class loader is accessible thread-locally
+                Thread.currentThread().setContextClassLoader(userJarLoader);
             }
 
-            if (functionConfig.getWindowConfig() != null) {
-                throw new IllegalArgumentException("There is currently no support windowing in python");
-            }
-        }
-
-        private void validateTopicName(String topic) {
-            if (!TopicName.isValid(topic)) {
-                throw new IllegalArgumentException(String.format("The topic name %s is invalid", topic));
+            try {
+                // Need to load jar and set context class loader before calling
+                ConfigValidation.validateConfig(functionConfig);
+            } catch (Exception e) {
+                throw new ParameterException(e.getMessage());
             }
         }
 
@@ -592,6 +404,21 @@ private void inferMissingArguments(FunctionConfig functionConfig) {
             if (StringUtils.isEmpty(functionConfig.getOutput())) {
                 inferMissingOutput(functionConfig);
             }
+
+            if (functionConfig.getParallelism() == 0) {
+                functionConfig.setParallelism(1);
+            }
+
+            WindowConfig windowConfig = functionConfig.getWindowConfig();
+            if (windowConfig != null) {
+                WindowUtils.inferDefaultConfigs(windowConfig);
+                // set auto ack to false since windowing framework is responsible
+                // for acking and not the function framework
+                if (autoAck != null && autoAck == true) {
+                    throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+                }
+                functionConfig.setAutoAck(false);
+            }
         }
 
         private void inferMissingFunctionName(FunctionConfig functionConfig) {
@@ -649,7 +476,7 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
                 } catch (MalformedURLException e) {
                     throw new RuntimeException("Failed to load user jar " + file, e);
                 }
-                typeArgs = getFunctionTypes(file, functionConfig);
+                typeArgs = Utils.getFunctionTypes(functionConfig);
             }
 
             FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index a82a195a79..8f16eacd37 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -25,6 +25,7 @@
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
 import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
 import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
 import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
@@ -93,7 +94,9 @@ private WindowConfig getWindowConfigs(Context context) {
                 (new Gson().toJson(context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).get())),
                 WindowConfig.class);
 
-        WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);
+
+        WindowUtils.inferDefaultConfigs(windowConfig);
+        ValidatorImpls.WindowConfigValidator.validateWindowConfig(windowConfig);
         return windowConfig;
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
index 6de61c2abb..73dda87b61 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
@@ -25,44 +25,7 @@ public static String getFullyQualifiedName(String tenant, String namespace, Stri
         return String.format("%s/%s/%s", tenant, namespace, name);
     }
 
-    public static void validateAndSetDefaultsWindowConfig(WindowConfig windowConfig) {
-        if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
-            throw new IllegalArgumentException("Window length is not specified");
-        }
-
-        if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
-            throw new IllegalArgumentException(
-                    "Window length for time and count are set! Please set one or the other.");
-        }
-
-        if (windowConfig.getWindowLengthCount() != null) {
-            if (windowConfig.getWindowLengthCount() <= 0) {
-                throw new IllegalArgumentException(
-                        "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
-            }
-        }
-
-        if (windowConfig.getWindowLengthDurationMs() != null) {
-            if (windowConfig.getWindowLengthDurationMs() <= 0) {
-                throw new IllegalArgumentException(
-                        "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
-            }
-        }
-
-        if (windowConfig.getSlidingIntervalCount() != null) {
-            if (windowConfig.getSlidingIntervalCount() <= 0) {
-                throw new IllegalArgumentException(
-                        "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
-            }
-        }
-
-        if (windowConfig.getSlidingIntervalDurationMs() != null) {
-            if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
-                throw new IllegalArgumentException(
-                        "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
-            }
-        }
-
+    public static void inferDefaultConfigs(WindowConfig windowConfig) {
         if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getSlidingIntervalDurationMs() == null) {
             windowConfig.setSlidingIntervalDurationMs(windowConfig.getWindowLengthDurationMs());
         }
@@ -72,20 +35,10 @@ public static void validateAndSetDefaultsWindowConfig(WindowConfig windowConfig)
         }
 
         if (windowConfig.getTimestampExtractorClassName() != null) {
-            if (windowConfig.getMaxLagMs() != null) {
-                if (windowConfig.getMaxLagMs() <= 0) {
-                    throw new IllegalArgumentException(
-                            "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
-                }
-            } else {
+            if (windowConfig.getMaxLagMs() == null) {
                 windowConfig.setMaxLagMs(WindowFunctionExecutor.DEFAULT_MAX_LAG_MS);
             }
-            if (windowConfig.getWatermarkEmitIntervalMs() != null) {
-                if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
-                    throw new IllegalArgumentException(
-                            "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
-                }
-            } else {
+            if (windowConfig.getWatermarkEmitIntervalMs() == null) {
                 windowConfig.setWatermarkEmitIntervalMs(WindowFunctionExecutor.DEFAULT_WATERMARK_EVENT_INTERVAL_MS);
             }
         }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 33565bc07d..c5d5d4ac22 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -537,8 +537,8 @@ public void testSettingLagTime() throws Exception {
                 if (arg0 == null) {
                     Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs(),
                             new Long(testWindowedPulsarFunction.DEFAULT_MAX_LAG_MS));
-                } else if((Long) arg0 <= 0) {
-                    fail(String.format("Window lag cannot be zero or less -- lagTime: %s", arg0));
+                } else if((Long) arg0 < 0) {
+                    fail(String.format("Window lag cannot be less than zero -- lagTime: %s", arg0));
                 } else {
                     Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs().longValue(),
                             maxLagMs.longValue());
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index b278ed753d..01710b1c5a 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -82,6 +82,11 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>typetools</artifactId>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index ffdda63e3f..2940e32da5 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -23,10 +23,21 @@
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidFunctionConfig;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidWindowConfig;
+import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
 
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
 
 @Getter
@@ -34,6 +45,7 @@
 @Data
 @EqualsAndHashCode
 @ToString
+@isValidFunctionConfig
 public class FunctionConfig {
 
     public enum ProcessingGuarantees {
@@ -61,25 +73,37 @@
         PYTHON
     }
 
+
+    @NotNull
     private String tenant;
+    @NotNull
     private String namespace;
+    @NotNull
     private String name;
+    @NotNull
+    @isImplementationOfClasses(implementsClasses = {Function.class, java.util.function.Function.class})
     private String className;
-
+    @isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
     private Collection<String> inputs;
+    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
+            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
     private Map<String, String> customSerdeInputs;
-
+    @isValidTopicName
     private String output;
+    @isImplementationOfClass(implementsClass = SerDe.class)
     private String outputSerdeClassName;
-
+    @isValidTopicName
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
     private Map<String, Object> userConfig;
     private SubscriptionType subscriptionType;
     private Runtime runtime;
     private boolean autoAck;
+    @isPositiveNumber
     private int parallelism;
+    @isValidResources
     private Resources resources;
     private String fqfn;
+    @isValidWindowConfig
     private WindowConfig windowConfig;
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 68ac86b155..2963e571fa 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -21,17 +21,28 @@
 import com.google.protobuf.AbstractMessage.Builder;
 import com.google.protobuf.MessageOrBuilder;
 import com.google.protobuf.util.JsonFormat;
+
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.net.ServerSocket;
+import java.util.Collection;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.functions.api.Function;
+
+import net.jodah.typetools.TypeResolver;
 
 /**
  * Utils used for runtime.
  */
+@Slf4j
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class Utils {
 
@@ -76,4 +87,66 @@ public static int findAvailablePort() {
             throw new RuntimeException("No free port found", ex);
         }
     }
+
+    public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) {
+
+        Object userClass = createInstance(functionConfig.getClassName(), Thread.currentThread().getContextClassLoader());
+
+        Class<?>[] typeArgs;
+        // if window function
+        if (functionConfig.getWindowConfig() != null) {
+            java.util.function.Function function = (java.util.function.Function) userClass;
+            if (function == null) {
+                throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated",
+                        functionConfig.getClassName()));
+            }
+            typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
+            if (!typeArgs[0].equals(Collection.class)) {
+                throw new IllegalArgumentException("Window function must take a collection as input");
+            }
+            Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
+            Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
+            Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
+            typeArgs[0] = (Class<?>) actualInputType;
+        } else {
+            if (userClass instanceof Function) {
+                Function pulsarFunction = (Function) userClass;
+                typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
+            } else {
+                java.util.function.Function function = (java.util.function.Function) userClass;
+                typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
+            }
+        }
+
+        return typeArgs;
+    }
+
+    public static Object createInstance(String userClassName, ClassLoader classLoader) {
+        Class<?> theCls;
+        try {
+            theCls = Class.forName(userClassName);
+        } catch (ClassNotFoundException cnfe) {
+            try {
+                theCls = Class.forName(userClassName, true, classLoader);
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException("User class must be in class path", cnfe);
+            }
+        }
+        Object result;
+        try {
+            Constructor<?> meth = theCls.getDeclaredConstructor();
+            meth.setAccessible(true);
+            result = meth.newInstance();
+        } catch (InstantiationException ie) {
+            throw new RuntimeException("User class must be concrete", ie);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("User class doesn't have such method", e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("User class must have a no-arg constructor", e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException("User class constructor throws exception", e);
+        }
+        return result;
+
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
new file mode 100644
index 0000000000..b665d1a69c
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class ConfigValidation {
+
+    public static void validateConfig(Object config) {
+        for (Field field : config.getClass().getDeclaredFields()) {
+            Object value = null;
+            field.setAccessible(true);
+            try {
+                value = field.get(config);
+            } catch (IllegalAccessException e) {
+               throw new RuntimeException(e);
+            }
+            validateField(field, value);
+        }
+        validateClass(config);
+    }
+
+    private static void validateClass(Object config) {
+        processAnnotations(config.getClass().getAnnotations(), config.getClass().getName(), config);
+    }
+
+    private static void validateField(Field field, Object value) {
+        processAnnotations(field.getAnnotations(), field.getName(), value);
+    }
+
+    private static void processAnnotations(Annotation[] annotations, String fieldName, Object value) {
+        try {
+            for (Annotation annotation : annotations) {
+
+                String type = annotation.annotationType().getName();
+                Class<?> validatorClass = null;
+                Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+                //check if annotation is one of our
+                for (Class<?> clazz : classes) {
+                    if (clazz.getName().equals(type)) {
+                        validatorClass = clazz;
+                        break;
+                    }
+                }
+                if (validatorClass != null) {
+                    Object v = validatorClass.cast(annotation);
+                    @SuppressWarnings("unchecked")
+                    Class<Validator> clazz = (Class<Validator>) validatorClass
+                            .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
+                    Validator o = null;
+                    Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
+                    //two constructor signatures used to initialize validators.
+                    //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
+                    //If validator has a constructor that takes a Map as an argument call that constructor
+                    if (hasConstructor(clazz, Map.class)) {
+                        o = clazz.getConstructor(Map.class).newInstance(params);
+                    } else { //If not call default constructor
+                        o = clazz.newInstance();
+                    }
+                    o.validateField(fieldName, value);
+                }
+            }
+        } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Map<String, Object> getParamsFromAnnotation(Class<?> validatorClass, Object v)
+            throws InvocationTargetException, IllegalAccessException {
+        Map<String, Object> params = new HashMap<String, Object>();
+        for (Method method : validatorClass.getDeclaredMethods()) {
+
+            Object value = null;
+            try {
+                value = (Object) method.invoke(v);
+            } catch (IllegalArgumentException ex) {
+                value = null;
+            }
+            if (value != null) {
+                params.put(method.getName(), value);
+            }
+        }
+        return params;
+    }
+
+    public static boolean hasConstructor(Class<?> clazz, Class<?> paramClass) {
+        Class<?>[] classes = { paramClass };
+        try {
+            clazz.getConstructor(classes);
+        } catch (NoSuchMethodException e) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
new file mode 100644
index 0000000000..f08cbba398
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+public class ConfigValidationAnnotations {
+
+    /**
+     * Validates on object is not null
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface NotNull {
+        Class<?> validatorClass() default ValidatorImpls.NotNullValidator.class;
+    }
+
+    /**
+     * Checks if a number is positive and whether zero inclusive Validator with fields: validatorClass, includeZero
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isPositiveNumber {
+        Class<?> validatorClass() default ValidatorImpls.PositiveNumberValidator.class;
+
+        boolean includeZero() default false;
+    }
+
+
+    /**
+     * Checks if resources specified are valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isValidResources {
+
+        Class<?> validatorClass() default ValidatorImpls.ResourcesValidator.class;
+    }
+
+    /**
+     * validates each entry in a list is of a certain type
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isListEntryType {
+        Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
+
+        Class<?> type();
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isStringList {
+        Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
+
+        Class<?> type() default String.class;
+    }
+
+    /**
+     * Validates each entry in a list with a list of validators Validators with fields: validatorClass and entryValidatorClass
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isListEntryCustom {
+        Class<?> validatorClass() default ValidatorImpls.ListEntryCustomValidator.class;
+
+        Class<?>[] entryValidatorClasses();
+    }
+
+
+    /**
+     * Validates the type of each key and value in a map Validator with fields: validatorClass, keyValidatorClass, valueValidatorClass
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isMapEntryType {
+        Class<?> validatorClass() default ValidatorImpls.MapEntryTypeValidator.class;
+
+        Class<?> keyType();
+
+        Class<?> valueType();
+    }
+
+    /**
+     * Checks if class name is assignable to the provided class/interfaces
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isImplementationOfClass {
+        Class<?> validatorClass() default ValidatorImpls.ImplementsClassValidator.class;
+
+        Class<?> implementsClass();
+    }
+
+    /**
+     * Checks if class name is assignable to ONE of the provided list class/interfaces
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isImplementationOfClasses {
+        Class<?> validatorClass() default ValidatorImpls.ImplementsClassesValidator.class;
+
+        Class<?>[] implementsClasses();
+    }
+
+    /**
+     * Validates a each key and value in a Map with a list of validators Validator with fields: validatorClass, keyValidatorClasses,
+     * valueValidatorClasses
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isMapEntryCustom {
+        Class<?> validatorClass() default ValidatorImpls.MapEntryCustomValidator.class;
+
+        Class<?>[] keyValidatorClasses();
+
+        Class<?>[] valueValidatorClasses();
+    }
+
+    /**
+     * checks if the topic name is valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isValidTopicName {
+        Class<?> validatorClass() default ValidatorImpls.TopicNameValidator.class;
+    }
+
+    /**
+     * checks if window configs is valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isValidWindowConfig {
+        Class<?> validatorClass() default ValidatorImpls.WindowConfigValidator.class;
+    }
+
+    /**
+     * checks function config as a whole to make sure all fields are valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target({ElementType.TYPE})
+    public @interface isValidFunctionConfig {
+        Class<?> validatorClass() default ValidatorImpls.FunctionConfigValidator.class;
+    }
+
+    /**
+     * Field names for annotations
+     */
+    public static class ValidatorParams {
+        static final String VALIDATOR_CLASS = "validatorClass";
+        static final String TYPE = "type";
+        static final String BASE_TYPE = "baseType";
+        static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
+        static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
+        static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
+        static final String KEY_TYPE = "keyType";
+        static final String VALUE_TYPE = "valueType";
+        static final String INCLUDE_ZERO = "includeZero";
+        static final String ACCEPTED_VALUES = "acceptedValues";
+        static final String IMPLEMENTS_CLASS = "implementsClass";
+        static final String IMPLEMENTS_CLASSES = "implementsClasses";
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
new file mode 100644
index 0000000000..f81cf068d6
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.util.Map;
+
+public class ConfigValidationUtils {
+    /**
+     * Returns a new NestableFieldValidator for a given class.
+     *
+     * @param cls     the Class the field should be a type of
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for that class
+     */
+    public static NestableFieldValidator fv(final Class cls, final boolean notNull) {
+        return new NestableFieldValidator() {
+            @Override
+            public void validateField(String pd, String name, Object field)
+                throws IllegalArgumentException {
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (!cls.isInstance(field)) {
+                    throw new IllegalArgumentException(
+                        pd + name + " must be a " + cls.getName() + ". (" + field + ")");
+                }
+            }
+        };
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a List of the given Class.
+     *
+     * @param cls     the Class of elements composing the list
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a list of the given class
+     */
+    public static NestableFieldValidator listFv(Class cls, boolean notNull) {
+        return listFv(fv(cls, notNull), notNull);
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a List where each item is validated by validator.
+     *
+     * @param validator used to validate each item in the list
+     * @param notNull   whether or not a value of null is valid
+     * @return a NestableFieldValidator for a list with each item validated by a different validator.
+     */
+    public static NestableFieldValidator listFv(final NestableFieldValidator validator,
+                                                final boolean notNull) {
+        return new NestableFieldValidator() {
+            @Override
+            public void validateField(String pd, String name, Object field)
+                throws IllegalArgumentException {
+
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (field instanceof Iterable) {
+                    for (Object e : (Iterable) field) {
+                        validator.validateField(pd + "Each element of the list ", name, e);
+                    }
+                    return;
+                }
+                throw new IllegalArgumentException(
+                    "Field " + name + " must be an Iterable but was " +
+                    ((field == null) ? "null" : ("a " + field.getClass())));
+            }
+        };
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a Map of key to val.
+     *
+     * @param key     the Class of keys in the map
+     * @param val     the Class of values in the map
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a Map of key to val
+     */
+    public static NestableFieldValidator mapFv(Class key, Class val,
+                                               boolean notNull) {
+        return mapFv(fv(key, false), fv(val, false), notNull);
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a Map.
+     *
+     * @param key     a validator for the keys in the map
+     * @param val     a validator for the values in the map
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a Map
+     */
+    public static NestableFieldValidator mapFv(final NestableFieldValidator key,
+                                               final NestableFieldValidator val, final boolean notNull) {
+        return new NestableFieldValidator() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public void validateField(String pd, String name, Object field)
+                throws IllegalArgumentException {
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (field instanceof Map) {
+                    for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) field).entrySet()) {
+                        key.validateField("Each key of the map ", name, entry.getKey());
+                        val.validateField("Each value in the map ", name, entry.getValue());
+                    }
+                    return;
+                }
+                throw new IllegalArgumentException(
+                    "Field " + name + " must be a Map");
+            }
+        };
+    }
+
+    /**
+     * Declares methods for validating configuration values.
+     */
+    public static interface FieldValidator {
+        /**
+         * Validates the given field.
+         *
+         * @param name  the name of the field.
+         * @param field The field to be validated.
+         * @throws IllegalArgumentException if the field fails validation.
+         */
+        public void validateField(String name, Object field) throws IllegalArgumentException;
+    }
+
+    /**
+     * Declares a method for validating configuration values that is nestable.
+     */
+    public static abstract class NestableFieldValidator implements FieldValidator {
+        @Override
+        public void validateField(String name, Object field) throws IllegalArgumentException {
+            validateField(null, name, field);
+        }
+
+        /**
+         * Validates the given field.
+         *
+         * @param pd    describes the parent wrapping this validator.
+         * @param name  the name of the field.
+         * @param field The field to be validated.
+         * @throws IllegalArgumentException if the field fails validation.
+         */
+        public abstract void validateField(String pd, String name, Object field) throws IllegalArgumentException;
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
new file mode 100644
index 0000000000..59410482a1
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.util.Map;
+
+public abstract class Validator {
+    public Validator(Map<String, Object> params) {
+    }
+
+    public Validator() {
+    }
+
+    public abstract void validateField(String name, Object o);
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
new file mode 100644
index 0000000000..64c23f537b
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -0,0 +1,655 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Resources;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.WindowConfig;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+
+@Slf4j
+public class ValidatorImpls {
+    /**
+     * Validates a positive number.
+     */
+    public static class PositiveNumberValidator extends Validator {
+
+        private boolean includeZero;
+
+        public PositiveNumberValidator() {
+            this.includeZero = false;
+        }
+
+        public PositiveNumberValidator(Map<String, Object> params) {
+            this.includeZero = (boolean) params.get(ConfigValidationAnnotations.ValidatorParams.INCLUDE_ZERO);
+        }
+
+        public static void validateField(String name, boolean includeZero, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (o instanceof Number) {
+                if (includeZero) {
+                    if (((Number) o).doubleValue() >= 0.0) {
+                        return;
+                    }
+                } else {
+                    if (((Number) o).doubleValue() > 0.0) {
+                        return;
+                    }
+                }
+            }
+            throw new IllegalArgumentException(String.format("Field '%s' must be a Positive Number", name));
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.includeZero, o);
+        }
+    }
+
+    /**
+     * Validates if an object is not null.
+     */
+
+    public static class NotNullValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name));
+            }
+        }
+    }
+
+    @NoArgsConstructor
+    public static class ResourcesValidator extends Validator {
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+               throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name));
+            }
+
+            if (o instanceof Resources) {
+                Resources resources = (Resources) o;
+                Double cpu = resources.getCpu();
+                Long ram = resources.getRam();
+                Long disk = resources.getDisk();
+                com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0.0,
+                        "The cpu allocation for the function must be positive");
+                com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0L,
+                        "The ram allocation for the function must be positive");
+                com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0L,
+                        "The disk allocation for the function must be positive");
+            } else {
+                throw new IllegalArgumentException(String.format("Field '%s' must be of Resource type!", name));
+            }
+        }
+    }
+
+    /**
+     * Validates each entry in a list.
+     */
+    public static class ListEntryTypeValidator extends Validator {
+
+        private Class<?> type;
+
+        public ListEntryTypeValidator(Map<String, Object> params) {
+            this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+        }
+
+        public static void validateField(String name, Class<?> type, Object o) {
+            ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.listFv(type, false);
+            validator.validateField(name, o);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.type, o);
+        }
+    }
+
+    /**
+     * validates each key and value in a map of a certain type.
+     */
+    public static class MapEntryTypeValidator extends Validator {
+
+        private Class<?> keyType;
+        private Class<?> valueType;
+
+        public MapEntryTypeValidator(Map<String, Object> params) {
+            this.keyType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
+            this.valueType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
+        }
+
+        public static void validateField(String name, Class<?> keyType, Class<?> valueType, Object o) {
+            ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(keyType, valueType, false);
+            validator.validateField(name, o);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.keyType, this.valueType, o);
+        }
+    }
+
+    public static class ImplementsClassValidator extends Validator {
+
+        Class<?> classImplements;
+
+        public ImplementsClassValidator(Map<String, Object> params) {
+            this.classImplements = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
+        }
+
+        public ImplementsClassValidator(Class<?> classImplements) {
+            this.classImplements = classImplements;
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, String.class, o);
+            String className = (String) o;
+            try {
+                ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
+                Class<?> objectClass = clsLoader.loadClass(className);
+                if (!this.classImplements.isAssignableFrom(objectClass)) {
+                    throw new IllegalArgumentException(
+                            String.format("Field '%s' with value '%s' does not implement %s ",
+                                    name, o, this.classImplements.getName()));
+                }
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * validates class implements one of these classes
+     */
+    public static class ImplementsClassesValidator extends Validator {
+
+        Class<?>[] classesImplements;
+
+        public ImplementsClassesValidator(Map<String, Object> params) {
+            this.classesImplements = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASSES);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, String.class, o);
+            String className = (String) o;
+            int count = 0;
+            for (Class<?> classImplements : classesImplements) {
+                Class<?> objectClass = null;
+                try {
+                    objectClass = loadClass(className);
+                } catch (ClassNotFoundException e) {
+                    throw new IllegalArgumentException("Cannot find/load class " + className);
+                }
+
+                if (classImplements.isAssignableFrom(objectClass)) {
+                    count++;
+                }
+            }
+            if (count == 0) {
+                throw new IllegalArgumentException(
+                        String.format("Field '%s' with value '%s' does not implement any of these classes %s",
+                                name, o, classesImplements));
+            }
+        }
+    }
+
+    @NoArgsConstructor
+    public static class SerdeValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o);
+        }
+    }
+
+    /**
+     * validates each key and each value against the respective arrays of validators.
+     */
+    public static class MapEntryCustomValidator extends Validator {
+
+        private Class<?>[] keyValidators;
+        private Class<?>[] valueValidators;
+
+        public MapEntryCustomValidator(Map<String, Object> params) {
+            this.keyValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
+            this.valueValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
+        }
+
+        @SuppressWarnings("unchecked")
+        public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o)
+                throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+            if (o == null) {
+                return;
+            }
+            //check if Map
+            SimpleTypeValidator.validateField(name, Map.class, o);
+            for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
+                for (Class<?> kv : keyValidators) {
+                    Object keyValidator = kv.getConstructor().newInstance();
+                    if (keyValidator instanceof Validator) {
+                        ((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
+                    } else {
+                        log.warn(
+                                "validator: {} cannot be used in MapEntryCustomValidator to validate keys.  Individual entry validators must " +
+                                        "a instance of Validator class",
+                                kv.getName());
+                    }
+                }
+                for (Class<?> vv : valueValidators) {
+                    Object valueValidator = vv.getConstructor().newInstance();
+                    if (valueValidator instanceof Validator) {
+                        ((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
+                    } else {
+                        log.warn(
+                                "validator: {} cannot be used in MapEntryCustomValidator to validate values.  Individual entry validators " +
+                                        "must a instance of Validator class",
+                                vv.getName());
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            try {
+                validateField(name, this.keyValidators, this.valueValidators, o);
+            } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @NoArgsConstructor
+    public static class StringValidator extends Validator {
+
+        private HashSet<String> acceptedValues = null;
+
+        public StringValidator(Map<String, Object> params) {
+
+            this.acceptedValues =
+                    new HashSet<String>(Arrays.asList((String[]) params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES)));
+
+            if (this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) {
+                this.acceptedValues = null;
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            SimpleTypeValidator.validateField(name, String.class, o);
+            if (this.acceptedValues != null) {
+                if (!this.acceptedValues.contains((String) o)) {
+                    throw new IllegalArgumentException(
+                            "Field " + name + " is not an accepted value. Value: " + o + " Accepted values: " + this.acceptedValues);
+                }
+            }
+        }
+    }
+    @NoArgsConstructor
+    public static class FunctionConfigValidator extends Validator {
+
+        private static void doJavaChecks(FunctionConfig functionConfig, String name) {
+            Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig);
+
+            ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
+            // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
+            // implements SerDe class
+            functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
+
+
+                Class<?> serdeClass;
+                try {
+                    serdeClass = loadClass(inputSerializer);
+                } catch (ClassNotFoundException e) {
+                    throw new IllegalArgumentException(
+                            String.format("The input serialization/deserialization class %s does not exist",
+                                    inputSerializer));
+                }
+
+                try {
+                    new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer);
+                } catch (IllegalArgumentException ex) {
+                    throw new IllegalArgumentException(
+                            String.format("The input serialization/deserialization class %s does not not implement %s",
+
+                                    inputSerializer, SerDe.class.getCanonicalName()));
+                }
+
+                if (inputSerializer.equals(DefaultSerDe.class.getName())) {
+                    if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
+                        throw new IllegalArgumentException("The default Serializer does not support type " +
+                                typeArgs[0]);
+                    }
+                } else {
+                    SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
+                    if (serDe == null) {
+                        throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
+                                inputSerializer));
+                    }
+                    Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+
+                    // type inheritance information seems to be lost in generic type
+                    // load the actual type class for verification
+                    Class<?> fnInputClass;
+                    Class<?> serdeInputClass;
+                    try {
+                        fnInputClass = Class.forName(typeArgs[0].getName(), true, clsLoader);
+                        serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
+                    } catch (ClassNotFoundException e) {
+                        throw new IllegalArgumentException("Failed to load type class", e);
+                    }
+
+                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+                        throw new IllegalArgumentException("Serializer type mismatch " + typeArgs[0] + " vs " + serDeTypes[0]);
+                    }
+                }
+            });
+            functionConfig.getInputs().forEach((topicName) -> {
+                if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
+                    throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]);
+                }
+            });
+            if (!Void.class.equals(typeArgs[1])) {
+                if (functionConfig.getOutputSerdeClassName() == null
+                        || functionConfig.getOutputSerdeClassName().isEmpty()
+                        || functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
+                    if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
+                        throw new RuntimeException("Default Serializer does not support type " + typeArgs[1]);
+                    }
+                } else {
+                    SerDe serDe = (SerDe) Reflections.createInstance(functionConfig.getOutputSerdeClassName(),
+                            clsLoader);
+                    if (serDe == null) {
+                        throw new IllegalArgumentException(String.format("SerDe class %s does not exist",
+                                functionConfig.getOutputSerdeClassName()));
+                    }
+                    Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+
+                    // type inheritance information seems to be lost in generic type
+                    // load the actual type class for verification
+                    Class<?> fnOutputClass;
+                    Class<?> serdeOutputClass;
+                    try {
+                        fnOutputClass = Class.forName(typeArgs[1].getName(), true, clsLoader);
+                        serdeOutputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
+                    } catch (ClassNotFoundException e) {
+                        throw new RuntimeException("Failed to load type class", e);
+                    }
+
+                    if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) {
+                        throw new RuntimeException("Serializer type mismatch " + typeArgs[1] + " vs " + serDeTypes[0]);
+                    }
+                }
+            }
+        }
+
+        private static void doPythonChecks(FunctionConfig functionConfig, String name) {
+            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
+            }
+
+            if (functionConfig.getWindowConfig() != null) {
+                throw new IllegalArgumentException("There is currently no support windowing in python");
+            }
+        }
+
+        private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
+            if (inputTopics.contains(outputTopic)) {
+                throw new IllegalArgumentException(
+                        String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
+                                outputTopic));
+            }
+        }
+
+        private static void doCommonChecks(FunctionConfig functionConfig) {
+            if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) {
+                throw new RuntimeException("No input topic(s) specified for the function");
+            }
+
+            // Ensure that topics aren't being used as both input and output
+            verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput());
+
+            if (functionConfig.getSubscriptionType() != null
+                    && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER
+                    && functionConfig.getProcessingGuarantees() != null
+                    && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type");
+            }
+
+            WindowConfig windowConfig = functionConfig.getWindowConfig();
+            if (windowConfig != null) {
+                // set auto ack to false since windowing framework is responsible
+                // for acking and not the function framework
+                if (functionConfig.isAutoAck() == true) {
+                    throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+                }
+                functionConfig.setAutoAck(false);
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            FunctionConfig functionConfig = (FunctionConfig) o;
+            doCommonChecks(functionConfig);
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                doJavaChecks(functionConfig, name);
+            } else {
+                doPythonChecks(functionConfig, name);
+            }
+        }
+    }
+
+    /**
+     * Validates each entry in a list against a list of custom Validators. Each validator in the list of validators must inherit or be an
+     * instance of Validator class
+     */
+    public static class ListEntryCustomValidator extends Validator {
+
+        private Class<?>[] entryValidators;
+
+        public ListEntryCustomValidator(Map<String, Object> params) {
+            this.entryValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
+        }
+
+        public static void validateField(String name, Class<?>[] validators, Object o)
+                throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+            if (o == null) {
+                return;
+            }
+            //check if iterable
+            SimpleTypeValidator.validateField(name, Iterable.class, o);
+            for (Object entry : (Iterable<?>) o) {
+                for (Class<?> validator : validators) {
+                    Object v = validator.getConstructor().newInstance();
+                    if (v instanceof Validator) {
+                        ((Validator) v).validateField(name + " list entry", entry);
+                    } else {
+                        log.warn(
+                                "validator: {} cannot be used in ListEntryCustomValidator.  Individual entry validators must a instance of " +
+                                        "Validator class",
+                                validator.getName());
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            try {
+                validateField(name, this.entryValidators, o);
+            } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @NoArgsConstructor
+    public static class TopicNameValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            new StringValidator().validateField(name, o);
+            String topic = (String) o;
+            if (!TopicName.isValid(topic)) {
+                throw new IllegalArgumentException(
+                        String.format("The topic name %s is invalid for field '%s'", topic, name));
+            }
+        }
+    }
+
+    public static class WindowConfigValidator extends Validator{
+
+        public static void validateWindowConfig(WindowConfig windowConfig) {
+            if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
+                throw new IllegalArgumentException("Window length is not specified");
+            }
+
+            if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
+                throw new IllegalArgumentException(
+                        "Window length for time and count are set! Please set one or the other.");
+            }
+
+            if (windowConfig.getWindowLengthCount() != null) {
+                if (windowConfig.getWindowLengthCount() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
+                }
+            }
+
+            if (windowConfig.getWindowLengthDurationMs() != null) {
+                if (windowConfig.getWindowLengthDurationMs() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
+                }
+            }
+
+            if (windowConfig.getSlidingIntervalCount() != null) {
+                if (windowConfig.getSlidingIntervalCount() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
+                }
+            }
+
+            if (windowConfig.getSlidingIntervalDurationMs() != null) {
+                if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
+                }
+            }
+
+            if (windowConfig.getTimestampExtractorClassName() != null) {
+                if (windowConfig.getMaxLagMs() != null) {
+                    if (windowConfig.getMaxLagMs() < 0) {
+                        throw new IllegalArgumentException(
+                                "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
+                    }
+                }
+                if (windowConfig.getWatermarkEmitIntervalMs() != null) {
+                    if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
+                        throw new IllegalArgumentException(
+                                "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (!(o instanceof WindowConfig)) {
+                throw new IllegalArgumentException(String.format("Field '%s' must be of WindowConfig type!", name));
+            }
+            WindowConfig windowConfig = (WindowConfig) o;
+            validateWindowConfig(windowConfig);
+        }
+    }
+
+    /**
+     * Validates basic types.
+     */
+    public static class SimpleTypeValidator extends Validator {
+
+        private Class<?> type;
+
+        public SimpleTypeValidator(Map<String, Object> params) {
+            this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+        }
+
+        public static void validateField(String name, Class<?> type, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (type.isInstance(o)) {
+                return;
+            }
+            throw new IllegalArgumentException(
+                    "Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass());
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.type, o);
+        }
+    }
+
+    private static Class<?> loadClass(String className) throws ClassNotFoundException {
+        Class<?> objectClass;
+        try {
+            objectClass = Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
+            if (clsLoader != null) {
+                objectClass = clsLoader.loadClass(className);
+            } else {
+                throw e;
+            }
+        }
+        return objectClass;
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services