You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/05/31 16:20:17 UTC

[incubator-pulsar] branch master updated: fix for python config validation (#1874)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1ffa7f   fix for python config validation (#1874)
a1ffa7f is described below

commit a1ffa7ff931e0ca7da8ec08452c6a3fc084212ea
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 31 09:20:13 2018 -0700

     fix for python config validation (#1874)
    
    * fix for python config validation
    
    * removing unnecessary file
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   2 +-
 .../pulsar/functions/utils/FunctionConfig.java     |   4 +-
 .../utils/validation/ConfigValidation.java         | 100 ++++++++++++++++-----
 .../validation/ConfigValidationAnnotations.java    |  40 ++++++++-
 4 files changed, 120 insertions(+), 26 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 94e17ad..3181290 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -385,7 +385,7 @@ public class CmdFunctions extends CmdBase {
 
             try {
                 // Need to load jar and set context class loader before calling
-                ConfigValidation.validateConfig(functionConfig);
+                ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name());
             } catch (Exception e) {
                 throw new ParameterException(e.getMessage());
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 2940e32..2c97e65 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -25,6 +25,7 @@ import lombok.Setter;
 import lombok.ToString;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
@@ -86,7 +87,8 @@ public class FunctionConfig {
     @isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
     private Collection<String> inputs;
     @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
-            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
+            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }, targetRuntime = ConfigValidation.Runtime.JAVA)
+    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON)
     private Map<String, String> customSerdeInputs;
     @isValidTopicName
     private String output;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
index b665d1a..fe89a01 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
@@ -19,40 +19,76 @@
 package org.apache.pulsar.functions.utils.validation;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.ACTUAL_RUNTIME;
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.TARGET_RUNTIME;
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS;
+
 @Slf4j
 public class ConfigValidation {
 
-    public static void validateConfig(Object config) {
+    public enum Runtime {
+        ALL,
+        JAVA,
+        PYTHON
+    }
+
+    public static void validateConfig(Object config, String runtimeType) {
         for (Field field : config.getClass().getDeclaredFields()) {
-            Object value = null;
+            Object value;
             field.setAccessible(true);
             try {
                 value = field.get(config);
             } catch (IllegalAccessException e) {
                throw new RuntimeException(e);
             }
-            validateField(field, value);
+            validateField(field, value, Runtime.valueOf(runtimeType));
         }
-        validateClass(config);
+        validateClass(config, Runtime.valueOf(runtimeType));
     }
 
-    private static void validateClass(Object config) {
-        processAnnotations(config.getClass().getAnnotations(), config.getClass().getName(), config);
+    private static void validateClass(Object config, Runtime runtime) {
+
+        List<Annotation> annotationList = new LinkedList<>();
+        Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+        for (Class clazz : classes) {
+            try {
+                Annotation[] anots = config.getClass().getAnnotationsByType(clazz);
+                annotationList.addAll(Arrays.asList(anots));
+            } catch (ClassCastException e) {
+
+            }
+        }
+        processAnnotations(annotationList, config.getClass().getName(), config, runtime);
     }
 
-    private static void validateField(Field field, Object value) {
-        processAnnotations(field.getAnnotations(), field.getName(), value);
+    private static void validateField(Field field, Object value, Runtime runtime) {
+        List<Annotation> annotationList = new LinkedList<>();
+        Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+        for (Class clazz : classes) {
+            try {
+                Annotation[] anots = field.getAnnotationsByType(clazz);
+                annotationList.addAll(Arrays.asList(anots));
+            } catch (ClassCastException e) {
+
+            }
+        }
+        processAnnotations(annotationList, field.getName(), value, runtime);
     }
 
-    private static void processAnnotations(Annotation[] annotations, String fieldName, Object value) {
+    private static void processAnnotations( List<Annotation> annotations, String fieldName, Object value,
+                                           Runtime runtime) {
         try {
             for (Annotation annotation : annotations) {
 
@@ -68,20 +104,31 @@ public class ConfigValidation {
                 }
                 if (validatorClass != null) {
                     Object v = validatorClass.cast(annotation);
-                    @SuppressWarnings("unchecked")
-                    Class<Validator> clazz = (Class<Validator>) validatorClass
-                            .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
-                    Validator o = null;
-                    Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
-                    //two constructor signatures used to initialize validators.
-                    //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
-                    //If validator has a constructor that takes a Map as an argument call that constructor
-                    if (hasConstructor(clazz, Map.class)) {
-                        o = clazz.getConstructor(Map.class).newInstance(params);
-                    } else { //If not call default constructor
-                        o = clazz.newInstance();
+                    if (hasMethod(validatorClass, VALIDATOR_CLASS)) {
+
+                        @SuppressWarnings("unchecked")
+                        Class<Validator> clazz = (Class<Validator>) validatorClass
+                                .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
+                        Validator o = null;
+                        Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
+
+                        if (params.containsKey(TARGET_RUNTIME)
+                                && params.get(TARGET_RUNTIME) != Runtime.ALL
+                                && params.get(TARGET_RUNTIME) != runtime) {
+                            continue;
+                        }
+                        params.put(ACTUAL_RUNTIME, runtime);
+                        //two constructor signatures used to initialize validators.
+                        //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
+
+                        //If validator has a constructor that takes a Map as an argument call that constructor
+                        if (hasConstructor(clazz, Map.class)) {
+                            o = clazz.getConstructor(Map.class).newInstance(params);
+                        } else { //If not call default constructor
+                            o = clazz.newInstance();
+                        }
+                        o.validateField(fieldName, value);
                     }
-                    o.validateField(fieldName, value);
                 }
             }
         } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
@@ -89,6 +136,15 @@ public class ConfigValidation {
         }
     }
 
+    private static boolean hasMethod(Class<?> clazz, String method) {
+        try {
+            clazz.getMethod(method);
+            return true;
+        } catch (NoSuchMethodException e) {
+           return false;
+        }
+    }
+
     private static Map<String, Object> getParamsFromAnnotation(Class<?> validatorClass, Object v)
             throws InvocationTargetException, IllegalAccessException {
         Map<String, Object> params = new HashMap<String, Object>();
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
index f08cbba..b01331e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.functions.utils.validation;
 
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
 import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
@@ -32,6 +35,8 @@ public class ConfigValidationAnnotations {
     @Target(ElementType.FIELD)
     public @interface NotNull {
         Class<?> validatorClass() default ValidatorImpls.NotNullValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -43,6 +48,8 @@ public class ConfigValidationAnnotations {
         Class<?> validatorClass() default ValidatorImpls.PositiveNumberValidator.class;
 
         boolean includeZero() default false;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
 
@@ -54,6 +61,8 @@ public class ConfigValidationAnnotations {
     public @interface isValidResources {
 
         Class<?> validatorClass() default ValidatorImpls.ResourcesValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -65,6 +74,8 @@ public class ConfigValidationAnnotations {
         Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
 
         Class<?> type();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     @Retention(RetentionPolicy.RUNTIME)
@@ -73,6 +84,8 @@ public class ConfigValidationAnnotations {
         Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
 
         Class<?> type() default String.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -84,6 +97,8 @@ public class ConfigValidationAnnotations {
         Class<?> validatorClass() default ValidatorImpls.ListEntryCustomValidator.class;
 
         Class<?>[] entryValidatorClasses();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
 
@@ -98,6 +113,8 @@ public class ConfigValidationAnnotations {
         Class<?> keyType();
 
         Class<?> valueType();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -109,6 +126,8 @@ public class ConfigValidationAnnotations {
         Class<?> validatorClass() default ValidatorImpls.ImplementsClassValidator.class;
 
         Class<?> implementsClass();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
     }
 
     /**
@@ -120,20 +139,31 @@ public class ConfigValidationAnnotations {
         Class<?> validatorClass() default ValidatorImpls.ImplementsClassesValidator.class;
 
         Class<?>[] implementsClasses();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
     }
 
     /**
      * Validates a each key and value in a Map with a list of validators Validator with fields: validatorClass, keyValidatorClasses,
      * valueValidatorClasses
      */
+    @Repeatable(isMapEntryCustoms.class)
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isMapEntryCustom {
         Class<?> validatorClass() default ValidatorImpls.MapEntryCustomValidator.class;
 
-        Class<?>[] keyValidatorClasses();
+        Class<?>[] keyValidatorClasses() default {};
+
+        Class<?>[] valueValidatorClasses() default {};
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
+    }
 
-        Class<?>[] valueValidatorClasses();
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isMapEntryCustoms {
+        isMapEntryCustom[] value();
     }
 
     /**
@@ -143,6 +173,8 @@ public class ConfigValidationAnnotations {
     @Target(ElementType.FIELD)
     public @interface isValidTopicName {
         Class<?> validatorClass() default ValidatorImpls.TopicNameValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -152,6 +184,8 @@ public class ConfigValidationAnnotations {
     @Target(ElementType.FIELD)
     public @interface isValidWindowConfig {
         Class<?> validatorClass() default ValidatorImpls.WindowConfigValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -179,5 +213,7 @@ public class ConfigValidationAnnotations {
         static final String ACCEPTED_VALUES = "acceptedValues";
         static final String IMPLEMENTS_CLASS = "implementsClass";
         static final String IMPLEMENTS_CLASSES = "implementsClasses";
+        static final String ACTUAL_RUNTIME = "actualRuntime";
+        static final String TARGET_RUNTIME = "targetRuntime";
     }
 }

-- 
To stop receiving notification emails like this one, please contact
jerrypeng@apache.org.