You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/31 16:20:16 UTC

[GitHub] jerrypeng closed pull request #1874: fix for python config validation

jerrypeng closed pull request #1874:  fix for python config validation
URL: https://github.com/apache/incubator-pulsar/pull/1874
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 94e17ad17b..31812904d8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -385,7 +385,7 @@ private void validateFunctionConfigs(FunctionConfig functionConfig) {
 
             try {
                 // Need to load jar and set context class loader before calling
-                ConfigValidation.validateConfig(functionConfig);
+                ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name());
             } catch (Exception e) {
                 throw new ParameterException(e.getMessage());
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 2940e32da5..2c97e65bbb 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -25,6 +25,7 @@
 import lombok.ToString;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
@@ -86,7 +87,8 @@
     @isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
     private Collection<String> inputs;
     @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
-            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
+            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }, targetRuntime = ConfigValidation.Runtime.JAVA)
+    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON)
     private Map<String, String> customSerdeInputs;
     @isValidTopicName
     private String output;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
index b665d1a69c..fe89a013fb 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
@@ -19,40 +19,76 @@
 package org.apache.pulsar.functions.utils.validation;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.ACTUAL_RUNTIME;
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.TARGET_RUNTIME;
+import static org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS;
+
 @Slf4j
 public class ConfigValidation {
 
-    public static void validateConfig(Object config) {
+    public enum Runtime {
+        ALL,
+        JAVA,
+        PYTHON
+    }
+
+    public static void validateConfig(Object config, String runtimeType) {
         for (Field field : config.getClass().getDeclaredFields()) {
-            Object value = null;
+            Object value;
             field.setAccessible(true);
             try {
                 value = field.get(config);
             } catch (IllegalAccessException e) {
                throw new RuntimeException(e);
             }
-            validateField(field, value);
+            validateField(field, value, Runtime.valueOf(runtimeType));
         }
-        validateClass(config);
+        validateClass(config, Runtime.valueOf(runtimeType));
     }
 
-    private static void validateClass(Object config) {
-        processAnnotations(config.getClass().getAnnotations(), config.getClass().getName(), config);
+    private static void validateClass(Object config, Runtime runtime) {
+
+        List<Annotation> annotationList = new LinkedList<>();
+        Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+        for (Class clazz : classes) {
+            try {
+                Annotation[] anots = config.getClass().getAnnotationsByType(clazz);
+                annotationList.addAll(Arrays.asList(anots));
+            } catch (ClassCastException e) {
+
+            }
+        }
+        processAnnotations(annotationList, config.getClass().getName(), config, runtime);
     }
 
-    private static void validateField(Field field, Object value) {
-        processAnnotations(field.getAnnotations(), field.getName(), value);
+    private static void validateField(Field field, Object value, Runtime runtime) {
+        List<Annotation> annotationList = new LinkedList<>();
+        Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+        for (Class clazz : classes) {
+            try {
+                Annotation[] anots = field.getAnnotationsByType(clazz);
+                annotationList.addAll(Arrays.asList(anots));
+            } catch (ClassCastException e) {
+
+            }
+        }
+        processAnnotations(annotationList, field.getName(), value, runtime);
     }
 
-    private static void processAnnotations(Annotation[] annotations, String fieldName, Object value) {
+    private static void processAnnotations( List<Annotation> annotations, String fieldName, Object value,
+                                           Runtime runtime) {
         try {
             for (Annotation annotation : annotations) {
 
@@ -68,20 +104,31 @@ private static void processAnnotations(Annotation[] annotations, String fieldNam
                 }
                 if (validatorClass != null) {
                     Object v = validatorClass.cast(annotation);
-                    @SuppressWarnings("unchecked")
-                    Class<Validator> clazz = (Class<Validator>) validatorClass
-                            .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
-                    Validator o = null;
-                    Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
-                    //two constructor signatures used to initialize validators.
-                    //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
-                    //If validator has a constructor that takes a Map as an argument call that constructor
-                    if (hasConstructor(clazz, Map.class)) {
-                        o = clazz.getConstructor(Map.class).newInstance(params);
-                    } else { //If not call default constructor
-                        o = clazz.newInstance();
+                    if (hasMethod(validatorClass, VALIDATOR_CLASS)) {
+
+                        @SuppressWarnings("unchecked")
+                        Class<Validator> clazz = (Class<Validator>) validatorClass
+                                .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
+                        Validator o = null;
+                        Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
+
+                        if (params.containsKey(TARGET_RUNTIME)
+                                && params.get(TARGET_RUNTIME) != Runtime.ALL
+                                && params.get(TARGET_RUNTIME) != runtime) {
+                            continue;
+                        }
+                        params.put(ACTUAL_RUNTIME, runtime);
+                        //two constructor signatures used to initialize validators.
+                        //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
+
+                        //If validator has a constructor that takes a Map as an argument call that constructor
+                        if (hasConstructor(clazz, Map.class)) {
+                            o = clazz.getConstructor(Map.class).newInstance(params);
+                        } else { //If not call default constructor
+                            o = clazz.newInstance();
+                        }
+                        o.validateField(fieldName, value);
                     }
-                    o.validateField(fieldName, value);
                 }
             }
         } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
@@ -89,6 +136,15 @@ private static void processAnnotations(Annotation[] annotations, String fieldNam
         }
     }
 
+    private static boolean hasMethod(Class<?> clazz, String method) {
+        try {
+            clazz.getMethod(method);
+            return true;
+        } catch (NoSuchMethodException e) {
+           return false;
+        }
+    }
+
     private static Map<String, Object> getParamsFromAnnotation(Class<?> validatorClass, Object v)
             throws InvocationTargetException, IllegalAccessException {
         Map<String, Object> params = new HashMap<String, Object>();
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
index f08cbba398..b01331e53e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.functions.utils.validation;
 
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
 import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
@@ -32,6 +35,8 @@
     @Target(ElementType.FIELD)
     public @interface NotNull {
         Class<?> validatorClass() default ValidatorImpls.NotNullValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -43,6 +48,8 @@
         Class<?> validatorClass() default ValidatorImpls.PositiveNumberValidator.class;
 
         boolean includeZero() default false;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
 
@@ -54,6 +61,8 @@
     public @interface isValidResources {
 
         Class<?> validatorClass() default ValidatorImpls.ResourcesValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -65,6 +74,8 @@
         Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
 
         Class<?> type();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     @Retention(RetentionPolicy.RUNTIME)
@@ -73,6 +84,8 @@
         Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class;
 
         Class<?> type() default String.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -84,6 +97,8 @@
         Class<?> validatorClass() default ValidatorImpls.ListEntryCustomValidator.class;
 
         Class<?>[] entryValidatorClasses();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
 
@@ -98,6 +113,8 @@
         Class<?> keyType();
 
         Class<?> valueType();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -109,6 +126,8 @@
         Class<?> validatorClass() default ValidatorImpls.ImplementsClassValidator.class;
 
         Class<?> implementsClass();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
     }
 
     /**
@@ -120,20 +139,31 @@
         Class<?> validatorClass() default ValidatorImpls.ImplementsClassesValidator.class;
 
         Class<?>[] implementsClasses();
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.JAVA;
     }
 
     /**
      * Validates a each key and value in a Map with a list of validators Validator with fields: validatorClass, keyValidatorClasses,
      * valueValidatorClasses
      */
+    @Repeatable(isMapEntryCustoms.class)
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isMapEntryCustom {
         Class<?> validatorClass() default ValidatorImpls.MapEntryCustomValidator.class;
 
-        Class<?>[] keyValidatorClasses();
+        Class<?>[] keyValidatorClasses() default {};
+
+        Class<?>[] valueValidatorClasses() default {};
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
+    }
 
-        Class<?>[] valueValidatorClasses();
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isMapEntryCustoms {
+        isMapEntryCustom[] value();
     }
 
     /**
@@ -143,6 +173,8 @@
     @Target(ElementType.FIELD)
     public @interface isValidTopicName {
         Class<?> validatorClass() default ValidatorImpls.TopicNameValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -152,6 +184,8 @@
     @Target(ElementType.FIELD)
     public @interface isValidWindowConfig {
         Class<?> validatorClass() default ValidatorImpls.WindowConfigValidator.class;
+
+        ConfigValidation.Runtime targetRuntime() default ConfigValidation.Runtime.ALL;
     }
 
     /**
@@ -179,5 +213,7 @@
         static final String ACCEPTED_VALUES = "acceptedValues";
         static final String IMPLEMENTS_CLASS = "implementsClass";
         static final String IMPLEMENTS_CLASSES = "implementsClasses";
+        static final String ACTUAL_RUNTIME = "actualRuntime";
+        static final String TARGET_RUNTIME = "targetRuntime";
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services