You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:26 UTC

[pulsar] 21/29: [fix][client] Add classLoader field for `SchemaDefinition` (#15915)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea20a896b408d052fe0ef366c4a052b6ccfb6c72
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Jun 24 00:33:59 2022 +0800

    [fix][client] Add classLoader field for `SchemaDefinition` (#15915)
    
    Fixes #15899
    
    ### Motivation
    
    Now, don‘t register logical type conversions when use `SchemaDefinition.<T>builder().withJsonDef()` beacase it without  classLoader param.
    
    See:
    https://github.com/apache/pulsar/blob/04aa9e8e51869d1621a7e25402a656084eebfc09/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java#L58-L68
    
    We can add the classLoader field for `SchemaDefinition`, user can manually pass a classLoader to register logical type conversions
    
    ### Modifications
    
    Add classLoader field for `SchemaDefinition`
    
    (cherry picked from commit 8434500b6879abc9ab74de6e5b75883e8053fd9c)
---
 .../pulsar/client/api/schema/SchemaDefinition.java |  7 ++
 .../client/api/schema/SchemaDefinitionBuilder.java |  9 +++
 .../pulsar/client/impl/schema/AvroSchema.java      |  5 +-
 .../impl/schema/SchemaDefinitionBuilderImpl.java   | 15 ++++-
 .../client/impl/schema/SchemaDefinitionImpl.java   | 11 +++-
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 77 ++++++++++++++++++++--
 .../client/impl/schema/SchemaBuilderTest.java      |  8 ++-
 7 files changed, 122 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
index a6777c5c2fa..88dd3670608 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
@@ -75,6 +75,13 @@ public interface SchemaDefinition<T> {
      */
     Class<T> getPojo();
 
+    /**
+     * Get pojo classLoader.
+     *
+     * @return pojo schema
+     */
+    ClassLoader getClassLoader();
+
     /**
      * Get supportSchemaVersioning schema definition.
      *
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
index 61d246674a8..97d822b927d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
@@ -80,6 +80,15 @@ public interface SchemaDefinitionBuilder<T> {
      */
     SchemaDefinitionBuilder<T> withPojo(Class pojo);
 
+    /**
+     * Set schema of pojo classLoader.
+     *
+     * @param classLoader pojo classLoader
+     *
+     * @return schema definition builder
+     */
+    SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader);
+
     /**
      * Set schema of json definition.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 3d0bf157cb3..d2ea9cd4a9f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -89,9 +89,12 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
                     schemaDefinition.getSchemaWriterOpt().get(), parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
         }
         ClassLoader pojoClassLoader = null;
-        if (schemaDefinition.getPojo() != null) {
+        if (schemaDefinition.getClassLoader() != null) {
+            pojoClassLoader = schemaDefinition.getClassLoader();
+        } else if (schemaDefinition.getPojo() != null) {
             pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
         }
+
         return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
index fe85a55e117..06a2f50abd6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
@@ -40,6 +40,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
      */
     private  Class<T> clazz;
 
+    /**
+     * the classLoader definition class.
+     */
+    private ClassLoader classLoader;
+
     /**
      * The flag of schema type always allow null.
      *
@@ -100,6 +105,12 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
         return this;
     }
 
+    @Override
+    public SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader) {
+        this.classLoader = classLoader;
+        return this;
+    }
+
     @Override
     public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) {
         this.jsonDef = jsonDef;
@@ -149,8 +160,8 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
 
         properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
         properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
-        return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
-                jsr310ConversionEnabled, reader, writer);
+        return new SchemaDefinitionImpl(clazz, jsonDef, classLoader,
+                alwaysAllowNull, properties, supportSchemaVersioning, jsr310ConversionEnabled, reader, writer);
 
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
index d0db78963db..090211a63fe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
@@ -50,6 +50,8 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
 
     private final String jsonDef;
 
+    private final ClassLoader classLoader;
+
     private final boolean supportSchemaVersioning;
 
     private final boolean jsr310ConversionEnabled;
@@ -58,13 +60,15 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
 
     private final SchemaWriter<T> writer;
 
-    public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String, String> properties,
+    public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, ClassLoader classLoader,
+                                boolean alwaysAllowNull, Map<String, String> properties,
                                 boolean supportSchemaVersioning, boolean jsr310ConversionEnabled,
                                 SchemaReader<T> reader, SchemaWriter<T> writer) {
         this.alwaysAllowNull = alwaysAllowNull;
         this.properties = properties;
         this.jsonDef = jsonDef;
         this.pojo = pojo;
+        this.classLoader = classLoader;
         this.supportSchemaVersioning = supportSchemaVersioning;
         this.jsr310ConversionEnabled = jsr310ConversionEnabled;
         this.reader = reader;
@@ -104,6 +108,11 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
         return pojo;
     }
 
+    @Override
+    public ClassLoader getClassLoader() {
+        return this.classLoader;
+    }
+
     @Override
     public boolean getSupportSchemaVersioning() {
         return supportSchemaVersioning;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index d69f8bf66ba..2a5040d7815 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -36,7 +36,9 @@ import java.time.LocalTime;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.UUID;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaValidationException;
@@ -463,21 +465,88 @@ public class AvroSchemaTest {
 
 
     @Data
-    private static class TimestampStruct {
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class TimestampPojo {
         Instant value;
     }
 
     @Test
     public void testTimestampWithJsr310Conversion() {
-        AvroSchema<TimestampStruct> schema = AvroSchema.of(TimestampStruct.class);
+        AvroSchema<TimestampPojo> schema = AvroSchema.of(TimestampPojo.class);
         Assert.assertEquals(
                 schema.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
                 new TimeConversions.TimestampMicrosConversion().getLogicalTypeName());
 
-        AvroSchema<TimestampStruct> schema2 = AvroSchema.of(SchemaDefinition.<TimestampStruct>builder()
-                .withPojo(TimestampStruct.class).withJSR310ConversionEnabled(true).build());
+        AvroSchema<TimestampPojo> schema2 = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withPojo(TimestampPojo.class).withJSR310ConversionEnabled(true).build());
         Assert.assertEquals(
                 schema2.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
                 new TimeConversions.TimestampMillisConversion().getLogicalTypeName());
     }
+
+    @Test
+    public void testTimestampWithJsonDef(){
+        AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withPojo(TimestampPojo.class)
+                .withJSR310ConversionEnabled(false).build());
+
+        TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z"));
+        byte[] encode = schemaWithPojo.encode(timestampPojo);
+        TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode);
+
+        Assert.assertEquals(decodeWithPojo, timestampPojo);
+
+        String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema());
+        AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withJsonDef(schemaDefinition)
+                .withClassLoader(TimestampPojo.class.getClassLoader())
+                .withJSR310ConversionEnabled(false).build());
+
+        TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode);
+
+        Assert.assertEquals(decodeWithJson, decodeWithPojo);
+        Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass());
+
+        AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withJsonDef(schemaDefinition)
+                .withJSR310ConversionEnabled(false).build());
+
+        TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode);
+        Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo);
+        Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass());
+    }
+
+    @Test
+    public void testTimestampWithJsonDefAndJSR310ConversionEnabled(){
+        AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withPojo(TimestampPojo.class)
+                .withJSR310ConversionEnabled(true).build());
+
+        TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z"));
+        byte[] encode = schemaWithPojo.encode(timestampPojo);
+        TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode);
+
+        Assert.assertNotEquals(decodeWithPojo, timestampPojo);
+
+        String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema());
+        AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withJsonDef(schemaDefinition)
+                .withClassLoader(TimestampPojo.class.getClassLoader())
+                .withJSR310ConversionEnabled(true).build());
+
+        TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode);
+
+        Assert.assertEquals(decodeWithJson, decodeWithPojo);
+        Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass());
+
+        AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withJsonDef(schemaDefinition)
+                .withJSR310ConversionEnabled(true).build());
+
+        TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode);
+        Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo);
+        Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass());
+    }
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index fa88e144a31..a1530864c92 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.client.impl.schema;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
-
 import lombok.Data;
 import org.apache.avro.reflect.Nullable;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.*;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;