You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/05/30 19:51:19 UTC
[incubator-pulsar] branch master updated: improving config
validation (#1859)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ecec933 improving config validation (#1859)
ecec933 is described below
commit ecec9337094820c65fe5ecf128f8070a3538bddd
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed May 30 12:51:11 2018 -0700
improving config validation (#1859)
* improving config validation
* removing unnecessary file
* removing unnecessary log
* fix bug
* fix potential NPE
---
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 7 +-
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 303 ++--------
.../windowing/WindowFunctionExecutor.java | 5 +-
.../pulsar/functions/windowing/WindowUtils.java | 53 +-
.../windowing/WindowFunctionExecutorTest.java | 4 +-
pulsar-functions/utils/pom.xml | 5 +
.../pulsar/functions/utils/FunctionConfig.java | 34 +-
.../org/apache/pulsar/functions/utils/Utils.java | 73 +++
.../utils/validation/ConfigValidation.java | 119 ++++
.../validation/ConfigValidationAnnotations.java | 183 ++++++
.../utils/validation/ConfigValidationUtils.java | 177 ++++++
.../functions/utils/validation/Validator.java | 31 +
.../functions/utils/validation/ValidatorImpls.java | 655 +++++++++++++++++++++
13 files changed, 1352 insertions(+), 297 deletions(-)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index de4ab40..1cc239d 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -86,7 +86,12 @@ public class CmdFunctionsTest {
private Functions functions;
private CmdFunctions cmd;
- public class DummyFunction implements Function<String, String> {
+ public static class DummyFunction implements Function<String, String> {
+
+ public DummyFunction() {
+
+ }
+
@Override
public String process(String input, Context context) throws Exception {
return null;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index d565568..94e17ad 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -18,14 +18,8 @@
*/
package org.apache.pulsar.admin.cli;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.isNull;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-
import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,31 +29,12 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-
+import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -70,10 +45,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
@@ -87,10 +59,33 @@ import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
import org.apache.pulsar.functions.windowing.WindowUtils;
-import net.jodah.typetools.TypeResolver;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
@Slf4j
@Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -228,7 +223,7 @@ public class CmdFunctions extends CmdBase {
@Parameter(names = "--userConfig", description = "User-defined config key/values")
protected String userConfigString;
@Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)")
- protected String parallelism;
+ protected Integer parallelism;
@Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)")
protected Double cpu;
@Parameter(names = "--ram", description = "The ram in bytes that need to be allocated per function instance(applicable only to process/docker runtime)")
@@ -275,19 +270,14 @@ public class CmdFunctions extends CmdBase {
if (null != inputs) {
List<String> inputTopics = Arrays.asList(inputs.split(","));
- inputTopics.forEach(this::validateTopicName);
functionConfig.setInputs(inputTopics);
}
if (null != customSerdeInputString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
- customSerdeInputMap.forEach((topic, serde) -> {
- validateTopicName(topic);
- });
functionConfig.setCustomSerdeInputs(customSerdeInputMap);
}
if (null != output) {
- validateTopicName(output);
functionConfig.setOutput(output);
}
if (null != logTopic) {
@@ -320,37 +310,12 @@ public class CmdFunctions extends CmdBase {
functionConfig.setUserConfig(new HashMap<>());
}
- if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) {
- throw new RuntimeException("No input topic(s) specified for the function");
- }
-
- // Ensure that topics aren't being used as both input and output
- verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput());
-
- if (parallelism == null) {
- if (functionConfig.getParallelism() == 0) {
- functionConfig.setParallelism(1);
- }
- } else {
- int num = Integer.parseInt(parallelism);
- if (num <= 0) {
- throw new IllegalArgumentException("The parallelism factor (the number of instances) for the function must be positive");
- }
- functionConfig.setParallelism(num);
+ if (parallelism != null) {
+ functionConfig.setParallelism(parallelism);
}
- com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0, "The cpu allocation for the function must be positive");
- com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0, "The ram allocation for the function must be positive");
- com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0, "The disk allocation for the function must be positive");
functionConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
- if (functionConfig.getSubscriptionType() != null
- && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER
- && functionConfig.getProcessingGuarantees() != null
- && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type");
- }
-
// window configs
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (null != windowLengthCount) {
@@ -377,15 +342,7 @@ public class CmdFunctions extends CmdBase {
}
windowConfig.setSlidingIntervalDurationMs(slidingIntervalDurationMs);
}
- if (windowConfig != null) {
- WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);
- // set auto ack to false since windowing framework is responsible
- // for acking and not the function framework
- if (autoAck != null && autoAck == true) {
- throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
- }
- functionConfig.setAutoAck(false);
- }
+
functionConfig.setWindowConfig(windowConfig);
if (null != autoAck) {
@@ -394,188 +351,43 @@ public class CmdFunctions extends CmdBase {
functionConfig.setAutoAck(true);
}
- inferMissingArguments(functionConfig);
if (null != jarFile) {
- doJavaSubmitChecks(functionConfig);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
userCodeFile = jarFile;
} else if (null != pyFile) {
- doPythonSubmitChecks(functionConfig);
functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
userCodeFile = pyFile;
} else {
throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function");
}
- }
- private Class<?>[] getFunctionTypes(File file, FunctionConfig functionConfig) {
- assertClassExistsInJar(file);
-
- Object userClass = Reflections.createInstance(functionConfig.getClassName(), file);
- Class<?>[] typeArgs;
- // if window function
- if (functionConfig.getWindowConfig() != null) {
- java.util.function.Function function = (java.util.function.Function) userClass;
- if (function == null) {
- throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s",
- functionConfig.getClassName(), jarFile));
- }
- typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
- if (!typeArgs[0].equals(Collection.class)) {
- throw new IllegalArgumentException("Window function must take a collection as input");
- }
- Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
- Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
- Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
- typeArgs[0] = (Class<?>) actualInputType;
- } else {
- if (userClass instanceof Function) {
- Function pulsarFunction = (Function) userClass;
- if (pulsarFunction == null) {
- throw new IllegalArgumentException(String.format("The Pulsar function class %s could not be instantiated from jar %s",
- functionConfig.getClassName(), jarFile));
- }
- typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
- } else {
- java.util.function.Function function = (java.util.function.Function) userClass;
- if (function == null) {
- throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s",
- functionConfig.getClassName(), jarFile));
- }
- typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
- }
- }
+ // infer default vaues
+ inferMissingArguments(functionConfig);
- return typeArgs;
+ // check if function configs are valid
+ validateFunctionConfigs(functionConfig);
}
- private void assertClassExistsInJar(File file) {
- if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) {
- throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
- functionConfig.getClassName(), jarFile));
- } else if (!Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), Function.class)
- && !Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), java.util.function.Function.class)) {
- throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function",
- functionConfig.getClassName(), jarFile));
- }
- }
+ private void validateFunctionConfigs(FunctionConfig functionConfig) {
- private void doJavaSubmitChecks(FunctionConfig functionConfig) {
- if (isNull(functionConfig.getClassName())) {
- throw new IllegalArgumentException("You supplied a jar file but no main class");
- }
-
- File file = new File(jarFile);
- ClassLoader userJarLoader;
- try {
- userJarLoader = Reflections.loadJar(file);
- } catch (MalformedURLException e) {
- throw new RuntimeException("Failed to load user jar " + file, e);
- }
- Class<?>[] typeArgs = getFunctionTypes(file, functionConfig);
- // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
- // implements SerDe class
- functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
- if (!Reflections.classExists(inputSerializer)
- && !Reflections.classExistsInJar(new File(jarFile), inputSerializer)) {
- throw new IllegalArgumentException(
- String.format("The input serialization/deserialization class %s does not exist",
- inputSerializer));
- } else if (Reflections.classExists(inputSerializer)) {
- if (!Reflections.classImplementsIface(inputSerializer, SerDe.class)) {
- throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not not implement %s",
- inputSerializer, SerDe.class.getCanonicalName()));
- }
- } else if (Reflections.classExistsInJar(new File(jarFile), inputSerializer)) {
- if (!Reflections.classInJarImplementsIface(new File(jarFile), inputSerializer, SerDe.class)) {
- throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not not implement %s",
- inputSerializer, SerDe.class.getCanonicalName()));
- }
- }
- if (inputSerializer.equals(DefaultSerDe.class.getName())) {
- if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
- throw new RuntimeException("The default Serializer does not support type " + typeArgs[0]);
- }
- } else {
- SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, file);
- if (serDe == null) {
- throw new IllegalArgumentException(String.format("The SerDe class %s does not exist in jar %s",
- inputSerializer, jarFile));
- }
- Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
- // type inheritance information seems to be lost in generic type
- // load the actual type class for verification
- Class<?> fnInputClass;
- Class<?> serdeInputClass;
- try {
- fnInputClass = Class.forName(typeArgs[0].getName(), true, userJarLoader);
- serdeInputClass = Class.forName(serDeTypes[0].getName(), true, userJarLoader);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Failed to load type class", e);
- }
-
- if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
- throw new RuntimeException("Serializer type mismatch " + typeArgs[0] + " vs " + serDeTypes[0]);
- }
- }
- });
- functionConfig.getInputs().forEach((topicName) -> {
- if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
- throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]);
- }
- });
- if (!Void.class.equals(typeArgs[1])) {
- if (functionConfig.getOutputSerdeClassName() == null
- || functionConfig.getOutputSerdeClassName().isEmpty()
- || functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
- if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
- throw new RuntimeException("Default Serializer does not support type " + typeArgs[1]);
- }
- } else {
- SerDe serDe = (SerDe) Reflections.createInstance(functionConfig.getOutputSerdeClassName(), file);
- if (serDe == null) {
- throw new IllegalArgumentException(String.format("SerDe class %s does not exist in jar %s",
- functionConfig.getOutputSerdeClassName(), jarFile));
- }
- Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
- // type inheritance information seems to be lost in generic type
- // load the actual type class for verification
- Class<?> fnOutputClass;
- Class<?> serdeOutputClass;
- try {
- fnOutputClass = Class.forName(typeArgs[1].getName(), true, userJarLoader);
- serdeOutputClass = Class.forName(serDeTypes[0].getName(), true, userJarLoader);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Failed to load type class", e);
- }
-
- if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) {
- throw new RuntimeException("Serializer type mismatch " + typeArgs[1] + " vs " + serDeTypes[0]);
- }
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+ File file = new File(jarFile);
+ ClassLoader userJarLoader;
+ try {
+ userJarLoader = Reflections.loadJar(file);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("Failed to load user jar " + file, e);
}
- }
- }
-
- private void doPythonSubmitChecks(FunctionConfig functionConfig) {
- if (functionConfig.getClassName() == null) {
- throw new IllegalArgumentException("You specified a Python file but no main class name");
- }
-
- if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
+ // make sure the function class loader is accessible thread-locally
+ Thread.currentThread().setContextClassLoader(userJarLoader);
}
- if (functionConfig.getWindowConfig() != null) {
- throw new IllegalArgumentException("There is currently no support windowing in python");
- }
- }
-
- private void validateTopicName(String topic) {
- if (!TopicName.isValid(topic)) {
- throw new IllegalArgumentException(String.format("The topic name %s is invalid", topic));
+ try {
+ // Need to load jar and set context class loader before calling
+ ConfigValidation.validateConfig(functionConfig);
+ } catch (Exception e) {
+ throw new ParameterException(e.getMessage());
}
}
@@ -592,6 +404,21 @@ public class CmdFunctions extends CmdBase {
if (StringUtils.isEmpty(functionConfig.getOutput())) {
inferMissingOutput(functionConfig);
}
+
+ if (functionConfig.getParallelism() == 0) {
+ functionConfig.setParallelism(1);
+ }
+
+ WindowConfig windowConfig = functionConfig.getWindowConfig();
+ if (windowConfig != null) {
+ WindowUtils.inferDefaultConfigs(windowConfig);
+ // set auto ack to false since windowing framework is responsible
+ // for acking and not the function framework
+ if (autoAck != null && autoAck == true) {
+ throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+ }
+ functionConfig.setAutoAck(false);
+ }
}
private void inferMissingFunctionName(FunctionConfig functionConfig) {
@@ -649,7 +476,7 @@ public class CmdFunctions extends CmdBase {
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to load user jar " + file, e);
}
- typeArgs = getFunctionTypes(file, functionConfig);
+ typeArgs = Utils.getFunctionTypes(functionConfig);
}
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index a82a195..8f16eac 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
@@ -93,7 +94,9 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> {
(new Gson().toJson(context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).get())),
WindowConfig.class);
- WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);
+
+ WindowUtils.inferDefaultConfigs(windowConfig);
+ ValidatorImpls.WindowConfigValidator.validateWindowConfig(windowConfig);
return windowConfig;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
index 6de61c2..73dda87 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
@@ -25,44 +25,7 @@ public class WindowUtils {
return String.format("%s/%s/%s", tenant, namespace, name);
}
- public static void validateAndSetDefaultsWindowConfig(WindowConfig windowConfig) {
- if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
- throw new IllegalArgumentException("Window length is not specified");
- }
-
- if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
- throw new IllegalArgumentException(
- "Window length for time and count are set! Please set one or the other.");
- }
-
- if (windowConfig.getWindowLengthCount() != null) {
- if (windowConfig.getWindowLengthCount() <= 0) {
- throw new IllegalArgumentException(
- "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
- }
- }
-
- if (windowConfig.getWindowLengthDurationMs() != null) {
- if (windowConfig.getWindowLengthDurationMs() <= 0) {
- throw new IllegalArgumentException(
- "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
- }
- }
-
- if (windowConfig.getSlidingIntervalCount() != null) {
- if (windowConfig.getSlidingIntervalCount() <= 0) {
- throw new IllegalArgumentException(
- "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
- }
- }
-
- if (windowConfig.getSlidingIntervalDurationMs() != null) {
- if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
- throw new IllegalArgumentException(
- "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
- }
- }
-
+ public static void inferDefaultConfigs(WindowConfig windowConfig) {
if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getSlidingIntervalDurationMs() == null) {
windowConfig.setSlidingIntervalDurationMs(windowConfig.getWindowLengthDurationMs());
}
@@ -72,20 +35,10 @@ public class WindowUtils {
}
if (windowConfig.getTimestampExtractorClassName() != null) {
- if (windowConfig.getMaxLagMs() != null) {
- if (windowConfig.getMaxLagMs() <= 0) {
- throw new IllegalArgumentException(
- "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
- }
- } else {
+ if (windowConfig.getMaxLagMs() == null) {
windowConfig.setMaxLagMs(WindowFunctionExecutor.DEFAULT_MAX_LAG_MS);
}
- if (windowConfig.getWatermarkEmitIntervalMs() != null) {
- if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
- throw new IllegalArgumentException(
- "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
- }
- } else {
+ if (windowConfig.getWatermarkEmitIntervalMs() == null) {
windowConfig.setWatermarkEmitIntervalMs(WindowFunctionExecutor.DEFAULT_WATERMARK_EVENT_INTERVAL_MS);
}
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 33565bc..c5d5d4a 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -537,8 +537,8 @@ public class WindowFunctionExecutorTest {
if (arg0 == null) {
Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs(),
new Long(testWindowedPulsarFunction.DEFAULT_MAX_LAG_MS));
- } else if((Long) arg0 <= 0) {
- fail(String.format("Window lag cannot be zero or less -- lagTime: %s", arg0));
+ } else if((Long) arg0 < 0) {
+ fail(String.format("Window lag cannot be less than zero -- lagTime: %s", arg0));
} else {
Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs().longValue(),
maxLagMs.longValue());
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index b278ed7..01710b1 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -82,6 +82,11 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>net.jodah</groupId>
+ <artifactId>typetools</artifactId>
+ </dependency>
+
</dependencies>
</project>
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index ffdda63..2940e32 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -23,10 +23,21 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidFunctionConfig;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
+import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidWindowConfig;
+import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
import java.util.Map;
@Getter
@@ -34,6 +45,7 @@ import java.util.Map;
@Data
@EqualsAndHashCode
@ToString
+@isValidFunctionConfig
public class FunctionConfig {
public enum ProcessingGuarantees {
@@ -61,25 +73,37 @@ public class FunctionConfig {
PYTHON
}
+
+ @NotNull
private String tenant;
+ @NotNull
private String namespace;
+ @NotNull
private String name;
+ @NotNull
+ @isImplementationOfClasses(implementsClasses = {Function.class, java.util.function.Function.class})
private String className;
-
+ @isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
private Collection<String> inputs;
+ @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
+ valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
private Map<String, String> customSerdeInputs;
-
+ @isValidTopicName
private String output;
+ @isImplementationOfClass(implementsClass = SerDe.class)
private String outputSerdeClassName;
-
+ @isValidTopicName
private String logTopic;
private ProcessingGuarantees processingGuarantees;
private Map<String, Object> userConfig;
private SubscriptionType subscriptionType;
private Runtime runtime;
private boolean autoAck;
+ @isPositiveNumber
private int parallelism;
+ @isValidResources
private Resources resources;
private String fqfn;
+ @isValidWindowConfig
private WindowConfig windowConfig;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 68ac86b..2963e57 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -21,17 +21,28 @@ package org.apache.pulsar.functions.utils;
import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
+
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.net.ServerSocket;
+import java.util.Collection;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.functions.api.Function;
+
+import net.jodah.typetools.TypeResolver;
/**
* Utils used for runtime.
*/
+@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class Utils {
@@ -76,4 +87,66 @@ public class Utils {
throw new RuntimeException("No free port found", ex);
}
}
+
+ public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) {
+
+ Object userClass = createInstance(functionConfig.getClassName(), Thread.currentThread().getContextClassLoader());
+
+ Class<?>[] typeArgs;
+ // if window function
+ if (functionConfig.getWindowConfig() != null) {
+ java.util.function.Function function = (java.util.function.Function) userClass;
+ if (function == null) {
+ throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated",
+ functionConfig.getClassName()));
+ }
+ typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
+ if (!typeArgs[0].equals(Collection.class)) {
+ throw new IllegalArgumentException("Window function must take a collection as input");
+ }
+ Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
+ Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
+ Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
+ typeArgs[0] = (Class<?>) actualInputType;
+ } else {
+ if (userClass instanceof Function) {
+ Function pulsarFunction = (Function) userClass;
+ typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
+ } else {
+ java.util.function.Function function = (java.util.function.Function) userClass;
+ typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
+ }
+ }
+
+ return typeArgs;
+ }
+
+ public static Object createInstance(String userClassName, ClassLoader classLoader) {
+ Class<?> theCls;
+ try {
+ theCls = Class.forName(userClassName);
+ } catch (ClassNotFoundException cnfe) {
+ try {
+ theCls = Class.forName(userClassName, true, classLoader);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("User class must be in class path", cnfe);
+ }
+ }
+ Object result;
+ try {
+ Constructor<?> meth = theCls.getDeclaredConstructor();
+ meth.setAccessible(true);
+ result = meth.newInstance();
+ } catch (InstantiationException ie) {
+ throw new RuntimeException("User class must be concrete", ie);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("User class doesn't have such method", e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("User class must have a no-arg constructor", e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("User class constructor throws exception", e);
+ }
+ return result;
+
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
new file mode 100644
index 0000000..b665d1a
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class ConfigValidation {
+
+ public static void validateConfig(Object config) {
+ for (Field field : config.getClass().getDeclaredFields()) {
+ Object value = null;
+ field.setAccessible(true);
+ try {
+ value = field.get(config);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ validateField(field, value);
+ }
+ validateClass(config);
+ }
+
+ private static void validateClass(Object config) {
+ processAnnotations(config.getClass().getAnnotations(), config.getClass().getName(), config);
+ }
+
+ private static void validateField(Field field, Object value) {
+ processAnnotations(field.getAnnotations(), field.getName(), value);
+ }
+
+ private static void processAnnotations(Annotation[] annotations, String fieldName, Object value) {
+ try {
+ for (Annotation annotation : annotations) {
+
+ String type = annotation.annotationType().getName();
+ Class<?> validatorClass = null;
+ Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+ //check if annotation is one of our
+ for (Class<?> clazz : classes) {
+ if (clazz.getName().equals(type)) {
+ validatorClass = clazz;
+ break;
+ }
+ }
+ if (validatorClass != null) {
+ Object v = validatorClass.cast(annotation);
+ @SuppressWarnings("unchecked")
+ Class<Validator> clazz = (Class<Validator>) validatorClass
+ .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
+ Validator o = null;
+ Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
+ //two constructor signatures used to initialize validators.
+ //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
+ //If validator has a constructor that takes a Map as an argument call that constructor
+ if (hasConstructor(clazz, Map.class)) {
+ o = clazz.getConstructor(Map.class).newInstance(params);
+ } else { //If not call default constructor
+ o = clazz.newInstance();
+ }
+ o.validateField(fieldName, value);
+ }
+ }
+ } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Map<String, Object> getParamsFromAnnotation(Class<?> validatorClass, Object v)
+ throws InvocationTargetException, IllegalAccessException {
+ Map<String, Object> params = new HashMap<String, Object>();
+ for (Method method : validatorClass.getDeclaredMethods()) {
+
+ Object value = null;
+ try {
+ value = (Object) method.invoke(v);
+ } catch (IllegalArgumentException ex) {
+ value = null;
+ }
+ if (value != null) {
+ params.put(method.getName(), value);
+ }
+ }
+ return params;
+ }
+
+ public static boolean hasConstructor(Class<?> clazz, Class<?> paramClass) {
+ Class<?>[] classes = { paramClass };
+ try {
+ clazz.getConstructor(classes);
+ } catch (NoSuchMethodException e) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
new file mode 100644
index 0000000..f08cbba
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+public class ConfigValidationAnnotations {
+
+ /**
+ * Validates on object is not null
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface NotNull {
+ Class<?> validatorClass() default ValidatorImpls.NotNullValidator.class;
+ }
+
+ /**
+ * Checks if a number is positive and whether zero inclusive Validator with fields: validatorClass, includeZero
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isPositiveNumber {
+ Class<?> validatorClass() default ValidatorImpls.PositiveNumberValidator.class;
+
+ boolean includeZero() default false;
+ }
+
+
+ /**
+ * Checks if resources specified are valid
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isValidResources {
+
+ Class<?> validatorClass() default ValidatorImpls.ResourcesValidator.class;
+ }
+
+ /**
+ * validates each entry in a list is of a certain type
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isListEntryType {
+ Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
+
+ Class<?> type();
+ }
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isStringList {
+ Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
+
+ Class<?> type() default String.class;
+ }
+
+ /**
+ * Validates each entry in a list with a list of validators Validators with fields: validatorClass and entryValidatorClass
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isListEntryCustom {
+ Class<?> validatorClass() default ValidatorImpls.ListEntryCustomValidator.class;
+
+ Class<?>[] entryValidatorClasses();
+ }
+
+
+ /**
+ * Validates the type of each key and value in a map Validator with fields: validatorClass, keyValidatorClass, valueValidatorClass
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isMapEntryType {
+ Class<?> validatorClass() default ValidatorImpls.MapEntryTypeValidator.class;
+
+ Class<?> keyType();
+
+ Class<?> valueType();
+ }
+
+ /**
+ * Checks if class name is assignable to the provided class/interfaces
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isImplementationOfClass {
+ Class<?> validatorClass() default ValidatorImpls.ImplementsClassValidator.class;
+
+ Class<?> implementsClass();
+ }
+
+ /**
+ * Checks if class name is assignable to ONE of the provided list class/interfaces
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isImplementationOfClasses {
+ Class<?> validatorClass() default ValidatorImpls.ImplementsClassesValidator.class;
+
+ Class<?>[] implementsClasses();
+ }
+
+ /**
+ * Validates a each key and value in a Map with a list of validators Validator with fields: validatorClass, keyValidatorClasses,
+ * valueValidatorClasses
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isMapEntryCustom {
+ Class<?> validatorClass() default ValidatorImpls.MapEntryCustomValidator.class;
+
+ Class<?>[] keyValidatorClasses();
+
+ Class<?>[] valueValidatorClasses();
+ }
+
+ /**
+ * checks if the topic name is valid
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isValidTopicName {
+ Class<?> validatorClass() default ValidatorImpls.TopicNameValidator.class;
+ }
+
+ /**
+ * checks if window configs is valid
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isValidWindowConfig {
+ Class<?> validatorClass() default ValidatorImpls.WindowConfigValidator.class;
+ }
+
+ /**
+ * checks function config as a whole to make sure all fields are valid
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({ElementType.TYPE})
+ public @interface isValidFunctionConfig {
+ Class<?> validatorClass() default ValidatorImpls.FunctionConfigValidator.class;
+ }
+
+ /**
+ * Field names for annotations
+ */
+ public static class ValidatorParams {
+ static final String VALIDATOR_CLASS = "validatorClass";
+ static final String TYPE = "type";
+ static final String BASE_TYPE = "baseType";
+ static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
+ static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
+ static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
+ static final String KEY_TYPE = "keyType";
+ static final String VALUE_TYPE = "valueType";
+ static final String INCLUDE_ZERO = "includeZero";
+ static final String ACCEPTED_VALUES = "acceptedValues";
+ static final String IMPLEMENTS_CLASS = "implementsClass";
+ static final String IMPLEMENTS_CLASSES = "implementsClasses";
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
new file mode 100644
index 0000000..f81cf06
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.util.Map;
+
+public class ConfigValidationUtils {
+ /**
+ * Returns a new NestableFieldValidator for a given class.
+ *
+ * @param cls the Class the field should be a type of
+ * @param notNull whether or not a value of null is valid
+ * @return a NestableFieldValidator for that class
+ */
+ public static NestableFieldValidator fv(final Class cls, final boolean notNull) {
+ return new NestableFieldValidator() {
+ @Override
+ public void validateField(String pd, String name, Object field)
+ throws IllegalArgumentException {
+ if (field == null) {
+ if (notNull) {
+ throw new IllegalArgumentException("Field " + name + " must not be null");
+ } else {
+ return;
+ }
+ }
+ if (!cls.isInstance(field)) {
+ throw new IllegalArgumentException(
+ pd + name + " must be a " + cls.getName() + ". (" + field + ")");
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns a new NestableFieldValidator for a List of the given Class.
+ *
+ * @param cls the Class of elements composing the list
+ * @param notNull whether or not a value of null is valid
+ * @return a NestableFieldValidator for a list of the given class
+ */
+ public static NestableFieldValidator listFv(Class cls, boolean notNull) {
+ return listFv(fv(cls, notNull), notNull);
+ }
+
+ /**
+ * Returns a new NestableFieldValidator for a List where each item is validated by validator.
+ *
+ * @param validator used to validate each item in the list
+ * @param notNull whether or not a value of null is valid
+ * @return a NestableFieldValidator for a list with each item validated by a different validator.
+ */
+ public static NestableFieldValidator listFv(final NestableFieldValidator validator,
+ final boolean notNull) {
+ return new NestableFieldValidator() {
+ @Override
+ public void validateField(String pd, String name, Object field)
+ throws IllegalArgumentException {
+
+ if (field == null) {
+ if (notNull) {
+ throw new IllegalArgumentException("Field " + name + " must not be null");
+ } else {
+ return;
+ }
+ }
+ if (field instanceof Iterable) {
+ for (Object e : (Iterable) field) {
+ validator.validateField(pd + "Each element of the list ", name, e);
+ }
+ return;
+ }
+ throw new IllegalArgumentException(
+ "Field " + name + " must be an Iterable but was " +
+ ((field == null) ? "null" : ("a " + field.getClass())));
+ }
+ };
+ }
+
+ /**
+ * Returns a new NestableFieldValidator for a Map of key to val.
+ *
+ * @param key the Class of keys in the map
+ * @param val the Class of values in the map
+ * @param notNull whether or not a value of null is valid
+ * @return a NestableFieldValidator for a Map of key to val
+ */
+ public static NestableFieldValidator mapFv(Class key, Class val,
+ boolean notNull) {
+ return mapFv(fv(key, false), fv(val, false), notNull);
+ }
+
+ /**
+ * Returns a new NestableFieldValidator for a Map.
+ *
+ * @param key a validator for the keys in the map
+ * @param val a validator for the values in the map
+ * @param notNull whether or not a value of null is valid
+ * @return a NestableFieldValidator for a Map
+ */
+ public static NestableFieldValidator mapFv(final NestableFieldValidator key,
+ final NestableFieldValidator val, final boolean notNull) {
+ return new NestableFieldValidator() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void validateField(String pd, String name, Object field)
+ throws IllegalArgumentException {
+ if (field == null) {
+ if (notNull) {
+ throw new IllegalArgumentException("Field " + name + " must not be null");
+ } else {
+ return;
+ }
+ }
+ if (field instanceof Map) {
+ for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) field).entrySet()) {
+ key.validateField("Each key of the map ", name, entry.getKey());
+ val.validateField("Each value in the map ", name, entry.getValue());
+ }
+ return;
+ }
+ throw new IllegalArgumentException(
+ "Field " + name + " must be a Map");
+ }
+ };
+ }
+
+ /**
+ * Declares methods for validating configuration values.
+ */
+ public static interface FieldValidator {
+ /**
+ * Validates the given field.
+ *
+ * @param name the name of the field.
+ * @param field The field to be validated.
+ * @throws IllegalArgumentException if the field fails validation.
+ */
+ public void validateField(String name, Object field) throws IllegalArgumentException;
+ }
+
+ /**
+ * Declares a method for validating configuration values that is nestable.
+ */
+ public static abstract class NestableFieldValidator implements FieldValidator {
+ @Override
+ public void validateField(String name, Object field) throws IllegalArgumentException {
+ validateField(null, name, field);
+ }
+
+ /**
+ * Validates the given field.
+ *
+ * @param pd describes the parent wrapping this validator.
+ * @param name the name of the field.
+ * @param field The field to be validated.
+ * @throws IllegalArgumentException if the field fails validation.
+ */
+ public abstract void validateField(String pd, String name, Object field) throws IllegalArgumentException;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
new file mode 100644
index 0000000..5941048
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.util.Map;
+
+public abstract class Validator {
+ public Validator(Map<String, Object> params) {
+ }
+
+ public Validator() {
+ }
+
+ public abstract void validateField(String name, Object o);
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
new file mode 100644
index 0000000..64c23f5
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -0,0 +1,655 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Resources;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.WindowConfig;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+
+@Slf4j
+public class ValidatorImpls {
+ /**
+ * Validates a positive number.
+ */
+ public static class PositiveNumberValidator extends Validator {
+
+ private boolean includeZero;
+
+ public PositiveNumberValidator() {
+ this.includeZero = false;
+ }
+
+ public PositiveNumberValidator(Map<String, Object> params) {
+ this.includeZero = (boolean) params.get(ConfigValidationAnnotations.ValidatorParams.INCLUDE_ZERO);
+ }
+
+ public static void validateField(String name, boolean includeZero, Object o) {
+ if (o == null) {
+ return;
+ }
+ if (o instanceof Number) {
+ if (includeZero) {
+ if (((Number) o).doubleValue() >= 0.0) {
+ return;
+ }
+ } else {
+ if (((Number) o).doubleValue() > 0.0) {
+ return;
+ }
+ }
+ }
+ throw new IllegalArgumentException(String.format("Field '%s' must be a Positive Number", name));
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ validateField(name, this.includeZero, o);
+ }
+ }
+
+ /**
+ * Validates if an object is not null.
+ */
+
+ public static class NotNullValidator extends Validator {
+
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name));
+ }
+ }
+ }
+
+ @NoArgsConstructor
+ public static class ResourcesValidator extends Validator {
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name));
+ }
+
+ if (o instanceof Resources) {
+ Resources resources = (Resources) o;
+ Double cpu = resources.getCpu();
+ Long ram = resources.getRam();
+ Long disk = resources.getDisk();
+ com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0.0,
+ "The cpu allocation for the function must be positive");
+ com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0L,
+ "The ram allocation for the function must be positive");
+ com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0L,
+ "The disk allocation for the function must be positive");
+ } else {
+ throw new IllegalArgumentException(String.format("Field '%s' must be of Resource type!", name));
+ }
+ }
+ }
+
+ /**
+ * Validates each entry in a list.
+ */
+ public static class ListEntryTypeValidator extends Validator {
+
+ private Class<?> type;
+
+ public ListEntryTypeValidator(Map<String, Object> params) {
+ this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+ }
+
+ public static void validateField(String name, Class<?> type, Object o) {
+ ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.listFv(type, false);
+ validator.validateField(name, o);
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ validateField(name, this.type, o);
+ }
+ }
+
+ /**
+ * validates each key and value in a map of a certain type.
+ */
+ public static class MapEntryTypeValidator extends Validator {
+
+ private Class<?> keyType;
+ private Class<?> valueType;
+
+ public MapEntryTypeValidator(Map<String, Object> params) {
+ this.keyType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
+ this.valueType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
+ }
+
+ public static void validateField(String name, Class<?> keyType, Class<?> valueType, Object o) {
+ ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(keyType, valueType, false);
+ validator.validateField(name, o);
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ validateField(name, this.keyType, this.valueType, o);
+ }
+ }
+
+ public static class ImplementsClassValidator extends Validator {
+
+ Class<?> classImplements;
+
+ public ImplementsClassValidator(Map<String, Object> params) {
+ this.classImplements = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
+ }
+
+ public ImplementsClassValidator(Class<?> classImplements) {
+ this.classImplements = classImplements;
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ return;
+ }
+ SimpleTypeValidator.validateField(name, String.class, o);
+ String className = (String) o;
+ try {
+ ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
+ Class<?> objectClass = clsLoader.loadClass(className);
+ if (!this.classImplements.isAssignableFrom(objectClass)) {
+ throw new IllegalArgumentException(
+ String.format("Field '%s' with value '%s' does not implement %s ",
+ name, o, this.classImplements.getName()));
+ }
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * validates class implements one of these classes
+ */
+ public static class ImplementsClassesValidator extends Validator {
+
+ Class<?>[] classesImplements;
+
+ public ImplementsClassesValidator(Map<String, Object> params) {
+ this.classesImplements = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASSES);
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ return;
+ }
+ SimpleTypeValidator.validateField(name, String.class, o);
+ String className = (String) o;
+ int count = 0;
+ for (Class<?> classImplements : classesImplements) {
+ Class<?> objectClass = null;
+ try {
+ objectClass = loadClass(className);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Cannot find/load class " + className);
+ }
+
+ if (classImplements.isAssignableFrom(objectClass)) {
+ count++;
+ }
+ }
+ if (count == 0) {
+ throw new IllegalArgumentException(
+ String.format("Field '%s' with value '%s' does not implement any of these classes %s",
+ name, o, classesImplements));
+ }
+ }
+ }
+
+ @NoArgsConstructor
+ public static class SerdeValidator extends Validator {
+
+ @Override
+ public void validateField(String name, Object o) {
+ new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o);
+ }
+ }
+
+ /**
+ * validates each key and each value against the respective arrays of validators.
+ */
+ public static class MapEntryCustomValidator extends Validator {
+
+ private Class<?>[] keyValidators;
+ private Class<?>[] valueValidators;
+
+ public MapEntryCustomValidator(Map<String, Object> params) {
+ this.keyValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
+ this.valueValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o)
+ throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+ if (o == null) {
+ return;
+ }
+ //check if Map
+ SimpleTypeValidator.validateField(name, Map.class, o);
+ for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
+ for (Class<?> kv : keyValidators) {
+ Object keyValidator = kv.getConstructor().newInstance();
+ if (keyValidator instanceof Validator) {
+ ((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
+ } else {
+ log.warn(
+ "validator: {} cannot be used in MapEntryCustomValidator to validate keys. Individual entry validators must " +
+ "a instance of Validator class",
+ kv.getName());
+ }
+ }
+ for (Class<?> vv : valueValidators) {
+ Object valueValidator = vv.getConstructor().newInstance();
+ if (valueValidator instanceof Validator) {
+ ((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
+ } else {
+ log.warn(
+ "validator: {} cannot be used in MapEntryCustomValidator to validate values. Individual entry validators " +
+ "must a instance of Validator class",
+ vv.getName());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ try {
+ validateField(name, this.keyValidators, this.valueValidators, o);
+ } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @NoArgsConstructor
+ public static class StringValidator extends Validator {
+
+ private HashSet<String> acceptedValues = null;
+
+ public StringValidator(Map<String, Object> params) {
+
+ this.acceptedValues =
+ new HashSet<String>(Arrays.asList((String[]) params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES)));
+
+ if (this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) {
+ this.acceptedValues = null;
+ }
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ SimpleTypeValidator.validateField(name, String.class, o);
+ if (this.acceptedValues != null) {
+ if (!this.acceptedValues.contains((String) o)) {
+ throw new IllegalArgumentException(
+ "Field " + name + " is not an accepted value. Value: " + o + " Accepted values: " + this.acceptedValues);
+ }
+ }
+ }
+ }
+ @NoArgsConstructor
+ public static class FunctionConfigValidator extends Validator {
+
+ private static void doJavaChecks(FunctionConfig functionConfig, String name) {
+ Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig);
+
+ ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
+ // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
+ // implements SerDe class
+ functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
+
+
+ Class<?> serdeClass;
+ try {
+ serdeClass = loadClass(inputSerializer);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ String.format("The input serialization/deserialization class %s does not exist",
+ inputSerializer));
+ }
+
+ try {
+ new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer);
+ } catch (IllegalArgumentException ex) {
+ throw new IllegalArgumentException(
+ String.format("The input serialization/deserialization class %s does not not implement %s",
+
+ inputSerializer, SerDe.class.getCanonicalName()));
+ }
+
+ if (inputSerializer.equals(DefaultSerDe.class.getName())) {
+ if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
+ throw new IllegalArgumentException("The default Serializer does not support type " +
+ typeArgs[0]);
+ }
+ } else {
+ SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
+ if (serDe == null) {
+ throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
+ inputSerializer));
+ }
+ Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+
+ // type inheritance information seems to be lost in generic type
+ // load the actual type class for verification
+ Class<?> fnInputClass;
+ Class<?> serdeInputClass;
+ try {
+ fnInputClass = Class.forName(typeArgs[0].getName(), true, clsLoader);
+ serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Failed to load type class", e);
+ }
+
+ if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+ throw new IllegalArgumentException("Serializer type mismatch " + typeArgs[0] + " vs " + serDeTypes[0]);
+ }
+ }
+ });
+ functionConfig.getInputs().forEach((topicName) -> {
+ if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
+ throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]);
+ }
+ });
+ if (!Void.class.equals(typeArgs[1])) {
+ if (functionConfig.getOutputSerdeClassName() == null
+ || functionConfig.getOutputSerdeClassName().isEmpty()
+ || functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
+ if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
+ throw new RuntimeException("Default Serializer does not support type " + typeArgs[1]);
+ }
+ } else {
+ SerDe serDe = (SerDe) Reflections.createInstance(functionConfig.getOutputSerdeClassName(),
+ clsLoader);
+ if (serDe == null) {
+ throw new IllegalArgumentException(String.format("SerDe class %s does not exist",
+ functionConfig.getOutputSerdeClassName()));
+ }
+ Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+
+ // type inheritance information seems to be lost in generic type
+ // load the actual type class for verification
+ Class<?> fnOutputClass;
+ Class<?> serdeOutputClass;
+ try {
+ fnOutputClass = Class.forName(typeArgs[1].getName(), true, clsLoader);
+ serdeOutputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Failed to load type class", e);
+ }
+
+ if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) {
+ throw new RuntimeException("Serializer type mismatch " + typeArgs[1] + " vs " + serDeTypes[0]);
+ }
+ }
+ }
+ }
+
+ private static void doPythonChecks(FunctionConfig functionConfig, String name) {
+ if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
+ }
+
+ if (functionConfig.getWindowConfig() != null) {
+ throw new IllegalArgumentException("There is currently no support windowing in python");
+ }
+ }
+
+ private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
+ if (inputTopics.contains(outputTopic)) {
+ throw new IllegalArgumentException(
+ String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
+ outputTopic));
+ }
+ }
+
+ private static void doCommonChecks(FunctionConfig functionConfig) {
+ if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) {
+ throw new RuntimeException("No input topic(s) specified for the function");
+ }
+
+ // Ensure that topics aren't being used as both input and output
+ verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput());
+
+ if (functionConfig.getSubscriptionType() != null
+ && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER
+ && functionConfig.getProcessingGuarantees() != null
+ && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type");
+ }
+
+ WindowConfig windowConfig = functionConfig.getWindowConfig();
+ if (windowConfig != null) {
+ // set auto ack to false since windowing framework is responsible
+ // for acking and not the function framework
+ if (functionConfig.isAutoAck() == true) {
+ throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+ }
+ functionConfig.setAutoAck(false);
+ }
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ FunctionConfig functionConfig = (FunctionConfig) o;
+ doCommonChecks(functionConfig);
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+ doJavaChecks(functionConfig, name);
+ } else {
+ doPythonChecks(functionConfig, name);
+ }
+ }
+ }
+
+ /**
+ * Validates each entry in a list against a list of custom Validators. Each validator in the list of validators must inherit or be an
+ * instance of Validator class
+ */
+ public static class ListEntryCustomValidator extends Validator {
+
+ private Class<?>[] entryValidators;
+
+ public ListEntryCustomValidator(Map<String, Object> params) {
+ this.entryValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
+ }
+
+ public static void validateField(String name, Class<?>[] validators, Object o)
+ throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+ if (o == null) {
+ return;
+ }
+ //check if iterable
+ SimpleTypeValidator.validateField(name, Iterable.class, o);
+ for (Object entry : (Iterable<?>) o) {
+ for (Class<?> validator : validators) {
+ Object v = validator.getConstructor().newInstance();
+ if (v instanceof Validator) {
+ ((Validator) v).validateField(name + " list entry", entry);
+ } else {
+ log.warn(
+ "validator: {} cannot be used in ListEntryCustomValidator. Individual entry validators must a instance of " +
+ "Validator class",
+ validator.getName());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ try {
+ validateField(name, this.entryValidators, o);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @NoArgsConstructor
+ public static class TopicNameValidator extends Validator {
+
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ return;
+ }
+ new StringValidator().validateField(name, o);
+ String topic = (String) o;
+ if (!TopicName.isValid(topic)) {
+ throw new IllegalArgumentException(
+ String.format("The topic name %s is invalid for field '%s'", topic, name));
+ }
+ }
+ }
+
+ public static class WindowConfigValidator extends Validator{
+
+ public static void validateWindowConfig(WindowConfig windowConfig) {
+ if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
+ throw new IllegalArgumentException("Window length is not specified");
+ }
+
+ if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
+ throw new IllegalArgumentException(
+ "Window length for time and count are set! Please set one or the other.");
+ }
+
+ if (windowConfig.getWindowLengthCount() != null) {
+ if (windowConfig.getWindowLengthCount() <= 0) {
+ throw new IllegalArgumentException(
+ "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
+ }
+ }
+
+ if (windowConfig.getWindowLengthDurationMs() != null) {
+ if (windowConfig.getWindowLengthDurationMs() <= 0) {
+ throw new IllegalArgumentException(
+ "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
+ }
+ }
+
+ if (windowConfig.getSlidingIntervalCount() != null) {
+ if (windowConfig.getSlidingIntervalCount() <= 0) {
+ throw new IllegalArgumentException(
+ "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
+ }
+ }
+
+ if (windowConfig.getSlidingIntervalDurationMs() != null) {
+ if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
+ throw new IllegalArgumentException(
+ "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
+ }
+ }
+
+ if (windowConfig.getTimestampExtractorClassName() != null) {
+ if (windowConfig.getMaxLagMs() != null) {
+ if (windowConfig.getMaxLagMs() < 0) {
+ throw new IllegalArgumentException(
+ "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
+ }
+ }
+ if (windowConfig.getWatermarkEmitIntervalMs() != null) {
+ if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
+ throw new IllegalArgumentException(
+ "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ return;
+ }
+ if (!(o instanceof WindowConfig)) {
+ throw new IllegalArgumentException(String.format("Field '%s' must be of WindowConfig type!", name));
+ }
+ WindowConfig windowConfig = (WindowConfig) o;
+ validateWindowConfig(windowConfig);
+ }
+ }
+
+ /**
+ * Validates basic types.
+ */
+ public static class SimpleTypeValidator extends Validator {
+
+ private Class<?> type;
+
+ public SimpleTypeValidator(Map<String, Object> params) {
+ this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+ }
+
+ public static void validateField(String name, Class<?> type, Object o) {
+ if (o == null) {
+ return;
+ }
+ if (type.isInstance(o)) {
+ return;
+ }
+ throw new IllegalArgumentException(
+ "Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass());
+ }
+
+ @Override
+ public void validateField(String name, Object o) {
+ validateField(name, this.type, o);
+ }
+ }
+
+ private static Class<?> loadClass(String className) throws ClassNotFoundException {
+ Class<?> objectClass;
+ try {
+ objectClass = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
+ if (clsLoader != null) {
+ objectClass = clsLoader.loadClass(className);
+ } else {
+ throw e;
+ }
+ }
+ return objectClass;
+ }
+}
--
To stop receiving notification emails like this one, please contact
jerrypeng@apache.org.