You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/06 18:15:51 UTC
[incubator-pulsar] branch master updated: Fix validator logic to
differentiate between serializer and deserializer (#2523)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e4aea8c Fix validator logic to differentiate between serializer and deserializer (#2523)
e4aea8c is described below
commit e4aea8cd04f8052ff91ba50294b25d01d8943d99
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Sep 6 11:15:49 2018 -0700
Fix validator logic to differentiate between serializer and deserializer (#2523)
* Fix validator logic to differentiate between serializer and deserializer
* Expand to include backend and schema
* Fix buil
* Fix unittest
* Fixed unittest
---
.../pulsar/functions/instance/ContextImpl.java | 4 +-
.../pulsar/functions/instance/InstanceUtils.java | 37 +++++++++----
.../apache/pulsar/functions/sink/PulsarSink.java | 4 +-
.../pulsar/functions/source/PulsarSource.java | 4 +-
.../pulsar/functions/source/TopicSchema.java | 18 +++----
.../pulsar/functions/sink/PulsarSinkTest.java | 2 +-
.../pulsar/functions/source/PulsarSourceTest.java | 2 +-
.../functions/utils/validation/ValidatorImpls.java | 62 ++++++++++++++--------
8 files changed, 80 insertions(+), 53 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index aec52fc..c4099f4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -254,13 +254,13 @@ class ContextImpl implements Context, SinkContext, SourceContext {
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object) {
- return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object));
+ return publish(topicName, object, "");
}
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
- return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName));
+ return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
}
@SuppressWarnings("unchecked")
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index b4a9bf9..86a9aa2 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -31,27 +31,42 @@ import net.jodah.typetools.TypeResolver;
@UtilityClass
public class InstanceUtils {
- public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg) {
+ public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg,
+ boolean deser) {
SerDe<?> serDe = createInstance(serdeClassName, clsLoader, SerDe.class);
Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
- checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
- "Inconsistent types found between function input type and input serde type: "
- + " function type = " + typeArg + " should be assignable from "
- + inputSerdeTypeArgs[0]);
+ if (deser) {
+ checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+ "Inconsistent types found between function input type and serde type: "
+ + " function type = " + typeArg + " should be assignable from "
+ + inputSerdeTypeArgs[0]);
+ } else {
+ checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+ "Inconsistent types found between function input type and serde type: "
+ + " serde type = " + inputSerdeTypeArgs[0] + " should be assignable from "
+ + typeArg);
+ }
return serDe;
}
- public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg) {
+ public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg,
+ boolean input) {
Schema<?> schema = createInstance(schemaClassName, clsLoader, Schema.class);
Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
- checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
- "Inconsistent types found between function input type and input schema type: "
- + " function type = " + typeArg + " should be assignable from "
- + inputSerdeTypeArgs[0]);
-
+ if (input) {
+ checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+ "Inconsistent types found between function type and schema type: "
+ + " function type = " + typeArg + " should be assignable from "
+ + inputSerdeTypeArgs[0]);
+ } else {
+ checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+ "Inconsistent types found between function type and schema type: "
+ + " schema type = " + inputSerdeTypeArgs[0] + " should be assignable from "
+ + typeArg);
+ }
return schema;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 5ec725c..835e288 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -277,10 +277,10 @@ public class PulsarSink<T> implements Sink<T> {
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
- pulsarSinkConfig.getSchemaType());
+ pulsarSinkConfig.getSchemaType(), false);
} else {
return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
- pulsarSinkConfig.getSerdeClassName());
+ pulsarSinkConfig.getSerdeClassName(), false);
}
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 244ab70..e1059f3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -162,9 +162,9 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> {
Schema<T> schema;
if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
- schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName());
+ schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true);
} else {
- schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType());
+ schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true);
}
configs.put(topic,
ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 1802ee5..2ac5b65 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -51,16 +51,12 @@ public class TopicSchema {
public static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";
- public Schema<?> getSchema(String topic, Object object) {
- return getSchema(topic, object.getClass(), "");
+ public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName, boolean input) {
+ return getSchema(topic, object.getClass(), schemaTypeOrClassName, input);
}
- public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName) {
- return getSchema(topic, object.getClass(), schemaTypeOrClassName);
- }
-
- public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName) {
- return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName));
+ public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName, boolean input) {
+ return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName, input));
}
public Schema<?> getSchema(String topic, Class<?> clazz, Optional<SchemaType> schemaType) {
@@ -134,7 +130,7 @@ public class TopicSchema {
}
@SuppressWarnings("unchecked")
- private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName) {
+ private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) {
// The schemaTypeOrClassName can represent multiple thing, either a schema type, a schema class name or a ser-de
// class name.
@@ -161,11 +157,11 @@ public class TopicSchema {
// First try with Schema
try {
return (Schema<T>) InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
- Thread.currentThread().getContextClassLoader(), clazz);
+ Thread.currentThread().getContextClassLoader(), clazz, input);
} catch (Throwable t) {
// Now try with Serde or just fail
SerDe<T> serDe = (SerDe<T>) InstanceUtils.initializeSerDe(schemaTypeOrClassName,
- Thread.currentThread().getContextClassLoader(), clazz);
+ Thread.currentThread().getContextClassLoader(), clazz, input);
return new SerDeSchema<>(serDe);
}
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 766dee3..72e3c56 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -140,7 +140,7 @@ public class PulsarSinkTest {
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (RuntimeException ex) {
log.error("RuntimeException: {}", ex, ex);
- assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:"));
+ assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and serde type:"));
} catch (Exception ex) {
log.error("Exception: {}", ex, ex);
assertTrue(false);
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 60f684f..e4825f2 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -159,7 +159,7 @@ public class PulsarSourceTest {
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (RuntimeException ex) {
log.error("RuntimeException: {}", ex, ex);
- assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:"));
+ assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and serde type:"));
} catch (Exception ex) {
log.error("Exception: {}", ex, ex);
assertTrue(false);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index f264d2a..f60f3c0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -381,7 +381,7 @@ public class ValidatorImpls {
// implements SerDe class
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
- validateSerde(inputSerializer, typeArgs[0], name, clsLoader);
+ validateSerde(inputSerializer, typeArgs[0], name, clsLoader, true);
});
}
@@ -389,7 +389,7 @@ public class ValidatorImpls {
// implements SerDe class
if (functionConfig.getCustomSchemaInputs() != null) {
functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> {
- validateSchema(schemaType, typeArgs[0], name, clsLoader);
+ validateSchema(schemaType, typeArgs[0], name, clsLoader, true);
});
}
@@ -405,10 +405,10 @@ public class ValidatorImpls {
String.format("Only one of schemaType or serdeClassName should be set in inputSpec"));
}
if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
- validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader);
+ validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader, true);
}
if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
- validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader);
+ validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader, true);
}
});
}
@@ -425,16 +425,17 @@ public class ValidatorImpls {
}
if (functionConfig.getOutputSchemaType() != null && !functionConfig.getOutputSchemaType().isEmpty()) {
- validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader);
+ validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader, false);
}
if (functionConfig.getOutputSerdeClassName() != null && !functionConfig.getOutputSerdeClassName().isEmpty()) {
- validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader);
+ validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader, false);
}
}
- private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader) {
+ private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader,
+ boolean input) {
if (StringUtils.isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) {
// If it's empty, we use the default schema and no need to validate
// If it's built-in, no need to validate
@@ -447,11 +448,12 @@ public class ValidatorImpls {
schemaType, Schema.class.getCanonicalName()));
}
- validateSchemaType(schemaType, typeArg, clsLoader);
+ validateSchemaType(schemaType, typeArg, clsLoader, input);
}
}
- private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader) {
+ private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader,
+ boolean deser) {
if (StringUtils.isEmpty(inputSerializer)) return;
Class<?> serdeClass;
try {
@@ -492,8 +494,14 @@ public class ValidatorImpls {
throw new IllegalArgumentException("Failed to load type class", e);
}
- if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
- throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+ if (deser) {
+ if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+ throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+ }
+ } else {
+ if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
+ throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+ }
}
}
}
@@ -734,10 +742,10 @@ public class ValidatorImpls {
}
if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
- FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader);
+ FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader, false);
}
if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
- FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader);
+ FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader, false);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
@@ -774,13 +782,13 @@ public class ValidatorImpls {
if (sinkConfig.getTopicToSerdeClassName() != null) {
sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
- FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader);
+ FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader, true);
});
}
if (sinkConfig.getTopicToSchemaType() != null) {
sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
- FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader);
+ FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader, true);
});
}
@@ -794,10 +802,10 @@ public class ValidatorImpls {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
}
if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
- FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader);
+ FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader, true);
}
if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
- FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader);
+ FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader, true);
}
});
}
@@ -900,8 +908,8 @@ public class ValidatorImpls {
}
}
- private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader) {
- validateCustomSchemaType(scheamType, typeArg, clsLoader);
+ private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader, boolean input) {
+ validateCustomSchemaType(scheamType, typeArg, clsLoader, input);
}
private static void validateSerDeType(String serdeClassName, Class<?> typeArg, ClassLoader clsLoader) {
@@ -929,7 +937,8 @@ public class ValidatorImpls {
}
}
- private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader) {
+ private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader,
+ boolean input) {
Schema<?> schema = (Schema<?>) Reflections.createInstance(schemaClassName, clsLoader);
if (schema == null) {
throw new IllegalArgumentException(String.format("The Schema class %s does not exist",
@@ -948,9 +957,16 @@ public class ValidatorImpls {
throw new IllegalArgumentException("Failed to load type class", e);
}
- if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
- throw new IllegalArgumentException(
- "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+ if (input) {
+ if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+ throw new IllegalArgumentException(
+ "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+ }
+ } else {
+ if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
+ throw new IllegalArgumentException(
+ "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+ }
}
}
}
\ No newline at end of file