You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/05/31 16:20:17 UTC
[incubator-pulsar] branch master updated: fix for python config
validation (#1874)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a1ffa7f fix for python config validation (#1874)
a1ffa7f is described below
commit a1ffa7ff931e0ca7da8ec08452c6a3fc084212ea
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 31 09:20:13 2018 -0700
fix for python config validation (#1874)
* fix for python config validation
* removing unnecessary file
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +-
.../pulsar/functions/utils/FunctionConfig.java | 4 +-
.../utils/validation/ConfigValidation.java | 100 ++++++++++++++++-----
.../validation/ConfigValidationAnnotations.java | 40 ++++++++-
4 files changed, 120 insertions(+), 26 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 94e17ad..3181290 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -385,7 +385,7 @@ public class CmdFunctions extends CmdBase {
try {
// Need to load jar and set context class loader before calling
- ConfigValidation.validateConfig(functionConfig);
+ ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name());
} catch (Exception e) {
throw new ParameterException(e.getMessage());
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 2940e32..2c97e65 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -25,6 +25,7 @@ import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
@@ -86,7 +87,8 @@ public class FunctionConfig {
@isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
private Collection<String> inputs;
@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
- valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
+ valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }, targetRuntime = ConfigValidation.Runtime.JAVA)
+ @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON)
private Map<String, String> customSerdeInputs;
@isValidTopicName
private String output;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
index b665d1a..fe89a01 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
@@ -19,40 +19,76 @@
package org.apache.pulsar.functions.utils.validation;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.utils.FunctionConfig;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.ACTUAL_RUNTIME;
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.TARGET_RUNTIME;
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS;
+
@Slf4j
public class ConfigValidation {
- public static void validateConfig(Object config) {
+ public enum Runtime {
+ ALL,
+ JAVA,
+ PYTHON
+ }
+
+ public static void validateConfig(Object config, String runtimeType) {
for (Field field : config.getClass().getDeclaredFields()) {
- Object value = null;
+ Object value;
field.setAccessible(true);
try {
value = field.get(config);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
- validateField(field, value);
+ validateField(field, value, Runtime.valueOf(runtimeType));
}
- validateClass(config);
+ validateClass(config, Runtime.valueOf(runtimeType));
}
- private static void validateClass(Object config) {
- processAnnotations(config.getClass().getAnnotations(), config.getClass().getName(), config);
+ private static void validateClass(Object config, Runtime runtime) {
+
+ List<Annotation> annotationList = new LinkedList<>();
+ Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+ for (Class clazz : classes) {
+ try {
+ Annotation[] anots = config.getClass().getAnnotationsByType(clazz);
+ annotationList.addAll(Arrays.asList(anots));
+ } catch (ClassCastException e) {
+
+ }
+ }
+ processAnnotations(annotationList, config.getClass().getName(), config, runtime);
}
- private static void validateField(Field field, Object value) {
- processAnnotations(field.getAnnotations(), field.getName(), value);
+ private static void validateField(Field field, Object value, Runtime runtime) {
+ List<Annotation> annotationList = new LinkedList<>();
+ Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+ for (Class clazz : classes) {
+ try {
+ Annotation[] anots = field.getAnnotationsByType(clazz);
+ annotationList.addAll(Arrays.asList(anots));
+ } catch (ClassCastException e) {
+
+ }
+ }
+ processAnnotations(annotationList, field.getName(), value, runtime);
}
- private static void processAnnotations(Annotation[] annotations, String fieldName, Object value) {
+ private static void processAnnotations( List<Annotation> annotations, String fieldName, Object value,
+ Runtime runtime) {
try {
for (Annotation annotation : annotations) {
@@ -68,20 +104,31 @@ public class ConfigValidation {
}
if (validatorClass != null) {
Object v = validatorClass.cast(annotation);
- @SuppressWarnings("unchecked")
- Class<Validator> clazz = (Class<Validator>) validatorClass
- .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
- Validator o = null;
- Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
- //two constructor signatures used to initialize validators.
- //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
- //If validator has a constructor that takes a Map as an argument call that constructor
- if (hasConstructor(clazz, Map.class)) {
- o = clazz.getConstructor(Map.class).newInstance(params);
- } else { //If not call default constructor
- o = clazz.newInstance();
+ if (hasMethod(validatorClass, VALIDATOR_CLASS)) {
+
+ @SuppressWarnings("unchecked")
+ Class<Validator> clazz = (Class<Validator>) validatorClass
+ .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
+ Validator o = null;
+ Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
+
+ if (params.containsKey(TARGET_RUNTIME)
+ && params.get(TARGET_RUNTIME) != Runtime.ALL
+ && params.get(TARGET_RUNTIME) != runtime) {
+ continue;
+ }
+ params.put(ACTUAL_RUNTIME, runtime);
+ //two constructor signatures used to initialize validators.
+ //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
+
+ //If validator has a constructor that takes a Map as an argument call that constructor
+ if (hasConstructor(clazz, Map.class)) {
+ o = clazz.getConstructor(Map.class).newInstance(params);
+ } else { //If not call default constructor
+ o = clazz.newInstance();
+ }
+ o.validateField(fieldName, value);
}
- o.validateField(fieldName, value);
}
}
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
@@ -89,6 +136,15 @@ public class ConfigValidation {
}
}
+ private static boolean hasMethod(Class<?> clazz, String method) {
+ try {
+ clazz.getMethod(method);
+ return true;
+ } catch (NoSuchMethodException e) {
+ return false;
+ }
+ }
+
private static Map<String, Object> getParamsFromAnnotation(Class<?> validatorClass, Object v)
throws InvocationTargetException, IllegalAccessException {
Map<String, Object> params = new HashMap<String, Object>();
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
index f08cbba..b01331e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -18,7 +18,10 @@
*/
package org.apache.pulsar.functions.utils.validation;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@@ -32,6 +35,8 @@ public class ConfigValidationAnnotations {
@Target(ElementType.FIELD)
public @interface NotNull {
Class<?> validatorClass() default ValidatorImpls.NotNullValidator.class;
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
/**
@@ -43,6 +48,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.PositiveNumberValidator.class;
boolean includeZero() default false;
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
@@ -54,6 +61,8 @@ public class ConfigValidationAnnotations {
public @interface isValidResources {
Class<?> validatorClass() default ValidatorImpls.ResourcesValidator.class;
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
/**
@@ -65,6 +74,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
Class<?> type();
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
@Retention(RetentionPolicy.RUNTIME)
@@ -73,6 +84,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
Class<?> type() default String.class;
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
/**
@@ -84,6 +97,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ListEntryCustomValidator.class;
Class<?>[] entryValidatorClasses();
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
@@ -98,6 +113,8 @@ public class ConfigValidationAnnotations {
Class<?> keyType();
Class<?> valueType();
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
/**
@@ -109,6 +126,8 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ImplementsClassValidator.class;
Class<?> implementsClass();
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
}
/**
@@ -120,20 +139,31 @@ public class ConfigValidationAnnotations {
Class<?> validatorClass() default ValidatorImpls.ImplementsClassesValidator.class;
Class<?>[] implementsClasses();
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
}
/**
* Validates a each key and value in a Map with a list of validators Validator with fields: validatorClass, keyValidatorClasses,
* valueValidatorClasses
*/
+ @Repeatable(isMapEntryCustoms.class)
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isMapEntryCustom {
Class<?> validatorClass() default ValidatorImpls.MapEntryCustomValidator.class;
- Class<?>[] keyValidatorClasses();
+ Class<?>[] keyValidatorClasses() default {};
+
+ Class<?>[] valueValidatorClasses() default {};
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
+ }
- Class<?>[] valueValidatorClasses();
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
+ public @interface isMapEntryCustoms {
+ isMapEntryCustom[] value();
}
/**
@@ -143,6 +173,8 @@ public class ConfigValidationAnnotations {
@Target(ElementType.FIELD)
public @interface isValidTopicName {
Class<?> validatorClass() default ValidatorImpls.TopicNameValidator.class;
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
/**
@@ -152,6 +184,8 @@ public class ConfigValidationAnnotations {
@Target(ElementType.FIELD)
public @interface isValidWindowConfig {
Class<?> validatorClass() default ValidatorImpls.WindowConfigValidator.class;
+
+ ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
}
/**
@@ -179,5 +213,7 @@ public class ConfigValidationAnnotations {
static final String ACCEPTED_VALUES = "acceptedValues";
static final String IMPLEMENTS_CLASS = "implementsClass";
static final String IMPLEMENTS_CLASSES = "implementsClasses";
+ static final String ACTUAL_RUNTIME = "actualRuntime";
+ static final String TARGET_RUNTIME = "targetRuntime";
}
}
--
To stop receiving notification emails like this one, please contact
jerrypeng@apache.org.