You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/06/23 16:34:07 UTC
[pulsar] branch master updated: [fix][client] Add classLoader field for `SchemaDefinition` (#15915)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8434500b687 [fix][client] Add classLoader field for `SchemaDefinition` (#15915)
8434500b687 is described below
commit 8434500b6879abc9ab74de6e5b75883e8053fd9c
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Jun 24 00:33:59 2022 +0800
[fix][client] Add classLoader field for `SchemaDefinition` (#15915)
Fixes #15899
### Motivation
Now, don‘t register logical type conversions when use `SchemaDefinition.<T>builder().withJsonDef()` beacase it without classLoader param.
See:
https://github.com/apache/pulsar/blob/04aa9e8e51869d1621a7e25402a656084eebfc09/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java#L58-L68
We can add the classLoader field for `SchemaDefinition`, user can manually pass a classLoader to register logical type conversions
### Modifications
Add classLoader field for `SchemaDefinition`
---
.../pulsar/client/api/schema/SchemaDefinition.java | 7 ++
.../client/api/schema/SchemaDefinitionBuilder.java | 9 +++
.../pulsar/client/impl/schema/AvroSchema.java | 5 +-
.../impl/schema/SchemaDefinitionBuilderImpl.java | 15 ++++-
.../client/impl/schema/SchemaDefinitionImpl.java | 11 +++-
.../pulsar/client/impl/schema/AvroSchemaTest.java | 77 ++++++++++++++++++++--
.../client/impl/schema/SchemaBuilderTest.java | 8 ++-
7 files changed, 122 insertions(+), 10 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
index a6777c5c2fa..88dd3670608 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
@@ -75,6 +75,13 @@ public interface SchemaDefinition<T> {
*/
Class<T> getPojo();
+ /**
+ * Get pojo classLoader.
+ *
+ * @return pojo schema
+ */
+ ClassLoader getClassLoader();
+
/**
* Get supportSchemaVersioning schema definition.
*
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
index 61d246674a8..97d822b927d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
@@ -80,6 +80,15 @@ public interface SchemaDefinitionBuilder<T> {
*/
SchemaDefinitionBuilder<T> withPojo(Class pojo);
+ /**
+ * Set schema of pojo classLoader.
+ *
+ * @param classLoader pojo classLoader
+ *
+ * @return schema definition builder
+ */
+ SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader);
+
/**
* Set schema of json definition.
*
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 3d0bf157cb3..d2ea9cd4a9f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -89,9 +89,12 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
schemaDefinition.getSchemaWriterOpt().get(), parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
}
ClassLoader pojoClassLoader = null;
- if (schemaDefinition.getPojo() != null) {
+ if (schemaDefinition.getClassLoader() != null) {
+ pojoClassLoader = schemaDefinition.getClassLoader();
+ } else if (schemaDefinition.getPojo() != null) {
pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
}
+
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
index fe85a55e117..06a2f50abd6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
@@ -40,6 +40,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
*/
private Class<T> clazz;
+ /**
+ * the classLoader definition class.
+ */
+ private ClassLoader classLoader;
+
/**
* The flag of schema type always allow null.
*
@@ -100,6 +105,12 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
return this;
}
+ @Override
+ public SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ return this;
+ }
+
@Override
public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) {
this.jsonDef = jsonDef;
@@ -149,8 +160,8 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
- return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
- jsr310ConversionEnabled, reader, writer);
+ return new SchemaDefinitionImpl(clazz, jsonDef, classLoader,
+ alwaysAllowNull, properties, supportSchemaVersioning, jsr310ConversionEnabled, reader, writer);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
index d0db78963db..090211a63fe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
@@ -50,6 +50,8 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
private final String jsonDef;
+ private final ClassLoader classLoader;
+
private final boolean supportSchemaVersioning;
private final boolean jsr310ConversionEnabled;
@@ -58,13 +60,15 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
private final SchemaWriter<T> writer;
- public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String, String> properties,
+ public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, ClassLoader classLoader,
+ boolean alwaysAllowNull, Map<String, String> properties,
boolean supportSchemaVersioning, boolean jsr310ConversionEnabled,
SchemaReader<T> reader, SchemaWriter<T> writer) {
this.alwaysAllowNull = alwaysAllowNull;
this.properties = properties;
this.jsonDef = jsonDef;
this.pojo = pojo;
+ this.classLoader = classLoader;
this.supportSchemaVersioning = supportSchemaVersioning;
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
this.reader = reader;
@@ -104,6 +108,11 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
return pojo;
}
+ @Override
+ public ClassLoader getClassLoader() {
+ return this.classLoader;
+ }
+
@Override
public boolean getSupportSchemaVersioning() {
return supportSchemaVersioning;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index d69f8bf66ba..2a5040d7815 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -36,7 +36,9 @@ import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.UUID;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
@@ -463,21 +465,88 @@ public class AvroSchemaTest {
@Data
- private static class TimestampStruct {
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private static class TimestampPojo {
Instant value;
}
@Test
public void testTimestampWithJsr310Conversion() {
- AvroSchema<TimestampStruct> schema = AvroSchema.of(TimestampStruct.class);
+ AvroSchema<TimestampPojo> schema = AvroSchema.of(TimestampPojo.class);
Assert.assertEquals(
schema.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
new TimeConversions.TimestampMicrosConversion().getLogicalTypeName());
- AvroSchema<TimestampStruct> schema2 = AvroSchema.of(SchemaDefinition.<TimestampStruct>builder()
- .withPojo(TimestampStruct.class).withJSR310ConversionEnabled(true).build());
+ AvroSchema<TimestampPojo> schema2 = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+ .withPojo(TimestampPojo.class).withJSR310ConversionEnabled(true).build());
Assert.assertEquals(
schema2.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
new TimeConversions.TimestampMillisConversion().getLogicalTypeName());
}
+
+ @Test
+ public void testTimestampWithJsonDef(){
+ AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+ .withPojo(TimestampPojo.class)
+ .withJSR310ConversionEnabled(false).build());
+
+ TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z"));
+ byte[] encode = schemaWithPojo.encode(timestampPojo);
+ TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode);
+
+ Assert.assertEquals(decodeWithPojo, timestampPojo);
+
+ String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema());
+ AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+ .withJsonDef(schemaDefinition)
+ .withClassLoader(TimestampPojo.class.getClassLoader())
+ .withJSR310ConversionEnabled(false).build());
+
+ TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode);
+
+ Assert.assertEquals(decodeWithJson, decodeWithPojo);
+ Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass());
+
+ AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+ .withJsonDef(schemaDefinition)
+ .withJSR310ConversionEnabled(false).build());
+
+ TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode);
+ Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo);
+ Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass());
+ }
+
+ @Test
+ public void testTimestampWithJsonDefAndJSR310ConversionEnabled(){
+ AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+ .withPojo(TimestampPojo.class)
+ .withJSR310ConversionEnabled(true).build());
+
+ TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z"));
+ byte[] encode = schemaWithPojo.encode(timestampPojo);
+ TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode);
+
+ Assert.assertNotEquals(decodeWithPojo, timestampPojo);
+
+ String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema());
+ AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+ .withJsonDef(schemaDefinition)
+ .withClassLoader(TimestampPojo.class.getClassLoader())
+ .withJSR310ConversionEnabled(true).build());
+
+ TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode);
+
+ Assert.assertEquals(decodeWithJson, decodeWithPojo);
+ Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass());
+
+ AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+ .withJsonDef(schemaDefinition)
+ .withJSR310ConversionEnabled(true).build());
+
+ TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode);
+ Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo);
+ Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass());
+ }
+
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index fa88e144a31..a1530864c92 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
-
import lombok.Data;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.*;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;