You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/06 18:15:51 UTC

[incubator-pulsar] branch master updated: Fix validator logic to differentiate between serializer and deserializer (#2523)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e4aea8c  Fix validator logic to differentiate between serializer and deserializer (#2523)
e4aea8c is described below

commit e4aea8cd04f8052ff91ba50294b25d01d8943d99
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Sep 6 11:15:49 2018 -0700

    Fix validator logic to differentiate between serializer and deserializer (#2523)
    
    * Fix validator logic to differentiate between serializer and deserializer
    
    * Expand to include backend and schema
    
    * Fix buil
    
    * Fix unittest
    
    * Fixed unittest
---
 .../pulsar/functions/instance/ContextImpl.java     |  4 +-
 .../pulsar/functions/instance/InstanceUtils.java   | 37 +++++++++----
 .../apache/pulsar/functions/sink/PulsarSink.java   |  4 +-
 .../pulsar/functions/source/PulsarSource.java      |  4 +-
 .../pulsar/functions/source/TopicSchema.java       | 18 +++----
 .../pulsar/functions/sink/PulsarSinkTest.java      |  2 +-
 .../pulsar/functions/source/PulsarSourceTest.java  |  2 +-
 .../functions/utils/validation/ValidatorImpls.java | 62 ++++++++++++++--------
 8 files changed, 80 insertions(+), 53 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index aec52fc..c4099f4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -254,13 +254,13 @@ class ContextImpl implements Context, SinkContext, SourceContext {
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object) {
-        return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object));
+        return publish(topicName, object, "");
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
-        return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName));
+        return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
     }
 
     @SuppressWarnings("unchecked")
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index b4a9bf9..86a9aa2 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -31,27 +31,42 @@ import net.jodah.typetools.TypeResolver;
 
 @UtilityClass
 public class InstanceUtils {
-    public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg) {
+    public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg,
+                                           boolean deser) {
         SerDe<?> serDe = createInstance(serdeClassName, clsLoader, SerDe.class);
 
         Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-        checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
-                "Inconsistent types found between function input type and input serde type: "
-                        + " function type = " + typeArg + " should be assignable from "
-                        + inputSerdeTypeArgs[0]);
+        if (deser) {
+            checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+                    "Inconsistent types found between function input type and serde type: "
+                            + " function type = " + typeArg + " should be assignable from "
+                            + inputSerdeTypeArgs[0]);
+        } else {
+            checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+                    "Inconsistent types found between function input type and serde type: "
+                            + " serde type = " + inputSerdeTypeArgs[0] + " should be assignable from "
+                            + typeArg);
+        }
 
         return serDe;
     }
 
-    public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg) {
+    public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg,
+                                                   boolean input) {
         Schema<?> schema = createInstance(schemaClassName, clsLoader, Schema.class);
 
         Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
-        checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
-                "Inconsistent types found between function input type and input schema type: "
-                        + " function type = " + typeArg + " should be assignable from "
-                        + inputSerdeTypeArgs[0]);
-
+        if (input) {
+            checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+                    "Inconsistent types found between function type and schema type: "
+                            + " function type = " + typeArg + " should be assignable from "
+                            + inputSerdeTypeArgs[0]);
+        } else {
+            checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+                    "Inconsistent types found between function type and schema type: "
+                            + " schema type = " + inputSerdeTypeArgs[0] + " should be assignable from "
+                            + typeArg);
+        }
         return schema;
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 5ec725c..835e288 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -277,10 +277,10 @@ public class PulsarSink<T> implements Sink<T> {
 
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
             return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
-                    pulsarSinkConfig.getSchemaType());
+                    pulsarSinkConfig.getSchemaType(), false);
         } else {
             return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
-                    pulsarSinkConfig.getSerdeClassName());
+                    pulsarSinkConfig.getSerdeClassName(), false);
         }
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 244ab70..e1059f3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -162,9 +162,9 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
         pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> {
             Schema<T> schema;
             if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
-                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName());
+                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true);
             } else {
-                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType());
+                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true);
             }
             configs.put(topic,
                     ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 1802ee5..2ac5b65 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -51,16 +51,12 @@ public class TopicSchema {
 
     public static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";
 
-    public Schema<?> getSchema(String topic, Object object) {
-        return getSchema(topic, object.getClass(), "");
+    public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName, boolean input) {
+        return getSchema(topic, object.getClass(), schemaTypeOrClassName, input);
     }
 
-    public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName) {
-        return getSchema(topic, object.getClass(), schemaTypeOrClassName);
-    }
-
-    public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName) {
-        return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName));
+    public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName, boolean input) {
+        return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName, input));
     }
 
     public Schema<?> getSchema(String topic, Class<?> clazz, Optional<SchemaType> schemaType) {
@@ -134,7 +130,7 @@ public class TopicSchema {
     }
 
     @SuppressWarnings("unchecked")
-    private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName) {
+    private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) {
         // The schemaTypeOrClassName can represent multiple thing, either a schema type, a schema class name or a ser-de
         // class name.
 
@@ -161,11 +157,11 @@ public class TopicSchema {
         // First try with Schema
         try {
             return (Schema<T>) InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
-                    Thread.currentThread().getContextClassLoader(), clazz);
+                    Thread.currentThread().getContextClassLoader(), clazz, input);
         } catch (Throwable t) {
             // Now try with Serde or just fail
             SerDe<T> serDe = (SerDe<T>) InstanceUtils.initializeSerDe(schemaTypeOrClassName,
-                    Thread.currentThread().getContextClassLoader(), clazz);
+                    Thread.currentThread().getContextClassLoader(), clazz, input);
             return new SerDeSchema<>(serDe);
         }
     }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 766dee3..72e3c56 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -140,7 +140,7 @@ public class PulsarSinkTest {
             fail("Should fail constructing java instance if function type is inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
-            assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:"));
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and serde type:"));
         } catch (Exception ex) {
             log.error("Exception: {}", ex, ex);
             assertTrue(false);
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 60f684f..e4825f2 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -159,7 +159,7 @@ public class PulsarSourceTest {
             fail("Should fail constructing java instance if function type is inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
-            assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:"));
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and serde type:"));
         } catch (Exception ex) {
             log.error("Exception: {}", ex, ex);
             assertTrue(false);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index f264d2a..f60f3c0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -381,7 +381,7 @@ public class ValidatorImpls {
             // implements SerDe class
             if (functionConfig.getCustomSerdeInputs() != null) {
                 functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
-                    validateSerde(inputSerializer, typeArgs[0], name, clsLoader);
+                    validateSerde(inputSerializer, typeArgs[0], name, clsLoader, true);
                 });
             }
 
@@ -389,7 +389,7 @@ public class ValidatorImpls {
             // implements SerDe class
             if (functionConfig.getCustomSchemaInputs() != null) {
                 functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> {
-                    validateSchema(schemaType, typeArgs[0], name, clsLoader);
+                    validateSchema(schemaType, typeArgs[0], name, clsLoader, true);
                 });
             }
 
@@ -405,10 +405,10 @@ public class ValidatorImpls {
                                 String.format("Only one of schemaType or serdeClassName should be set in inputSpec"));
                     }
                     if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
-                        validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader);
+                        validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader, true);
                     }
                     if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
-                        validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader);
+                        validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader, true);
                     }
                 });
             }
@@ -425,16 +425,17 @@ public class ValidatorImpls {
             }
 
             if (functionConfig.getOutputSchemaType() != null && !functionConfig.getOutputSchemaType().isEmpty()) {
-                validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader);
+                validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader, false);
             }
 
             if (functionConfig.getOutputSerdeClassName() != null && !functionConfig.getOutputSerdeClassName().isEmpty()) {
-                validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader);
+                validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader, false);
             }
 
         }
 
-        private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader) {
+        private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader,
+                                           boolean input) {
             if (StringUtils.isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) {
                 // If it's empty, we use the default schema and no need to validate
                 // If it's built-in, no need to validate
@@ -447,11 +448,12 @@ public class ValidatorImpls {
                                     schemaType, Schema.class.getCanonicalName()));
                 }
 
-                validateSchemaType(schemaType, typeArg, clsLoader);
+                validateSchemaType(schemaType, typeArg, clsLoader, input);
             }
         }
 
-        private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader) {
+        private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader,
+                                          boolean deser) {
             if (StringUtils.isEmpty(inputSerializer)) return;
             Class<?> serdeClass;
             try {
@@ -492,8 +494,14 @@ public class ValidatorImpls {
                     throw new IllegalArgumentException("Failed to load type class", e);
                 }
 
-                if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                    throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+                if (deser) {
+                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+                        throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+                    }
+                } else {
+                    if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
+                        throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+                    }
                 }
             }
         }
@@ -734,10 +742,10 @@ public class ValidatorImpls {
                 }
 
                 if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
-                    FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader);
+                    FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader, false);
                 }
                 if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
-                    FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader);
+                    FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader, false);
                 }
             } catch (IOException e) {
                 throw new IllegalArgumentException(e);
@@ -774,13 +782,13 @@ public class ValidatorImpls {
 
                 if (sinkConfig.getTopicToSerdeClassName() != null) {
                     sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
-                        FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader);
+                        FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader, true);
                     });
                 }
 
                 if (sinkConfig.getTopicToSchemaType() != null) {
                     sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
-                        FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader);
+                        FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader, true);
                     });
                 }
 
@@ -794,10 +802,10 @@ public class ValidatorImpls {
                             throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
                         }
                         if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
-                            FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader);
+                            FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader, true);
                         }
                         if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
-                            FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader);
+                            FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader, true);
                         }
                     });
                 }
@@ -900,8 +908,8 @@ public class ValidatorImpls {
         }
     }
 
-    private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader) {
-        validateCustomSchemaType(scheamType, typeArg, clsLoader);
+    private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader, boolean input) {
+        validateCustomSchemaType(scheamType, typeArg, clsLoader, input);
     }
 
     private static void validateSerDeType(String serdeClassName, Class<?> typeArg, ClassLoader clsLoader) {
@@ -929,7 +937,8 @@ public class ValidatorImpls {
         }
     }
 
-    private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader) {
+    private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader,
+                                                 boolean input) {
         Schema<?> schema = (Schema<?>) Reflections.createInstance(schemaClassName, clsLoader);
         if (schema == null) {
             throw new IllegalArgumentException(String.format("The Schema class %s does not exist",
@@ -948,9 +957,16 @@ public class ValidatorImpls {
             throw new IllegalArgumentException("Failed to load type class", e);
         }
 
-        if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
-            throw new IllegalArgumentException(
-                    "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+        if (input) {
+            if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+                throw new IllegalArgumentException(
+                        "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+            }
+        } else {
+            if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
+                throw new IllegalArgumentException(
+                        "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+            }
         }
     }
 }
\ No newline at end of file