You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/03/14 07:52:53 UTC

[pulsar] branch master updated: Support passing schema definition for JSON and AVRO schemas (#3766)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da68b23  Support passing schema definition for JSON and AVRO schemas (#3766)
da68b23 is described below

commit da68b23cbaf1f618549fb01a3aae17e687eacbd2
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Thu Mar 14 15:52:49 2019 +0800

    Support passing schema definition for JSON and AVRO schemas (#3766)
    
    * Support passing schema definition for JSON and AVRO schemas
    
    *Motivation*
    
    Currently AVRO and Schema generated schemas from POJO directly.
    Sometime people would like to use pre-generated/defined schemas,
    so allow passing in schema definitions would clear the confusions
    on parsing schemas from POJO.
    
    *Modifications*
    
    - Abstract a common base class `StructSchema` for AVRO/PROTOBUF/JSON
    - Standarize on using avro schema for defining schema (we already did that. this change only makes it clearer)
    - Add methods to pass schema definition for JSON and AVRO schemas
    
    *NOTES*
    
    We don't support passing schema definition for PROTOBUF. since we only supported generated messages as POJO
    class for protobuf schema, and we generate schema definition from the generated messages. it doesn't make sense
    to pass in a different schema definition.
    
    * Add missing license header
---
 .../java/org/apache/pulsar/client/api/Schema.java  | 37 +++++++++++
 .../client/internal/DefaultImplementation.java     | 14 ++++
 .../pulsar/client/impl/schema/AvroSchema.java      | 43 ++++++-------
 .../pulsar/client/impl/schema/JSONSchema.java      | 51 ++++++++-------
 .../pulsar/client/impl/schema/ProtobufSchema.java  | 31 ++++-----
 .../pulsar/client/impl/schema/StructSchema.java    | 74 ++++++++++++++++++++++
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 74 ++++++++++++++++++++++
 7 files changed, 263 insertions(+), 61 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 5660773..e17fb06 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.api;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.internal.DefaultImplementation;
@@ -177,6 +179,27 @@ public interface Schema<T> {
     }
 
     /**
+     * Create a Avro schema type using the provided avro schema definition.
+     *
+     * @param schemaDefinition avro schema definition
+     * @return a Schema instance
+     */
+    static <T> Schema<T> AVRO(String schemaDefinition) {
+        return AVRO(schemaDefinition, Collections.emptyMap());
+    }
+
+    /**
+     * Create a Avro schema type using the provided avro schema definition.
+     *
+     * @param schemaDefinition avro schema definition
+     * @param properties pulsar schema properties
+     * @return a Schema instance
+     */
+    static <T> Schema<T> AVRO(String schemaDefinition, Map<String, String> properties) {
+        return DefaultImplementation.newAvroSchema(schemaDefinition, properties);
+    }
+
+    /**
      * Create a JSON schema type by extracting the fields of the specified class.
      *
      * @param clazz the POJO class to be used to extract the JSON schema
@@ -187,6 +210,20 @@ public interface Schema<T> {
     }
 
     /**
+     * Create a JSON schema type by extracting the fields of the specified class.
+     *
+     * @param clazz the POJO class to be used to extract the JSON schema
+     * @param schemaDefinition schema definition json string (using avro schema syntax)
+     * @param properties pulsar schema properties
+     * @return a Schema instance
+     */
+    static <T> Schema<T> JSON(Class<T> clazz,
+                              String schemaDefinition,
+                              Map<String, String> properties) {
+        return DefaultImplementation.newJSONSchema(clazz, schemaDefinition, properties);
+    }
+
+    /**
      * Key Value Schema using passed in schema type, support JSON and AVRO currently.
      */
     static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value, SchemaType type) {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 873b041..abd368e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -188,6 +188,12 @@ public class DefaultImplementation {
                         .invoke(null, clazz));
     }
 
+    public static <T> Schema<T> newAvroSchema(String schemaDefinition, Map<String, String> properties) {
+        return catchExceptions(
+                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", String.class, Map.class)
+                        .invoke(null, schemaDefinition, properties));
+    }
+
     public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
         return catchExceptions(
                 () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", Class.class)
@@ -200,6 +206,14 @@ public class DefaultImplementation {
                         .invoke(null, clazz));
     }
 
+    public static <T> Schema<T> newJSONSchema(Class<T> clazz,
+                                              String schemaDefinition,
+                                              Map<String, String> properties) {
+        return catchExceptions(
+                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", Class.class, String.class, Map.class)
+                        .invoke(null, clazz, schemaDefinition, properties));
+    }
+
     public static Schema<GenericRecord> newAutoConsumeSchema() {
         return catchExceptions(
                 () -> (Schema<GenericRecord>) newClassInstance("org.apache.pulsar.client.impl.schema.AutoConsumeSchema")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index c7e4a93..c9726aa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -18,19 +18,14 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 import java.io.ByteArrayOutputStream;
@@ -38,11 +33,12 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 
+/**
+ * An AVRO schema implementation.
+ */
 @Slf4j
-public class AvroSchema<T> implements Schema<T> {
+public class AvroSchema<T> extends StructSchema<T> {
 
-    private SchemaInfo schemaInfo;
-    private org.apache.avro.Schema schema;
     private ReflectDatumWriter<T> datumWriter;
     private ReflectDatumReader<T> reader;
     private BinaryEncoder encoder;
@@ -53,13 +49,10 @@ public class AvroSchema<T> implements Schema<T> {
 
     private AvroSchema(org.apache.avro.Schema schema,
                        Map<String, String> properties) {
-        this.schema = schema;
-
-        this.schemaInfo = new SchemaInfo();
-        this.schemaInfo.setName("");
-        this.schemaInfo.setProperties(properties);
-        this.schemaInfo.setType(SchemaType.AVRO);
-        this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
+        super(
+            SchemaType.AVRO,
+            schema,
+            properties);
 
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
@@ -94,15 +87,6 @@ public class AvroSchema<T> implements Schema<T> {
         }
     }
 
-    @Override
-    public SchemaInfo getSchemaInfo() {
-        return this.schemaInfo;
-    }
-
-    private static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) {
-        return ReflectData.AllowNull.get().getSchema(pojo);
-    }
-
     public static <T> AvroSchema<T> of(Class<T> pojo) {
         return new AvroSchema<>(createAvroSchema(pojo), Collections.emptyMap());
     }
@@ -111,4 +95,15 @@ public class AvroSchema<T> implements Schema<T> {
         return new AvroSchema<>(createAvroSchema(pojo), properties);
     }
 
+    /**
+     * Create an Avro schema based on provided schema definition.
+     *
+     * @param schemaDefinition avro schema definition
+     * @param properties schema properties
+     * @return avro schema instance
+     */
+    public static <T> AvroSchema<T> of(String schemaDefinition, Map<String, String> properties) {
+        return new AvroSchema<>(parseAvroSchema(schemaDefinition), properties);
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 3289584..b915ff2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -27,8 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -37,13 +33,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 
+/**
+ * A schema implementation to deal with json data.
+ */
 @Slf4j
-public class JSONSchema<T> implements Schema<T>{
-
-    private final org.apache.avro.Schema schema;
-    private final SchemaInfo schemaInfo;
-    private final Class<T> pojo;
-    private Map<String, String> properties;
+public class JSONSchema<T> extends StructSchema<T> {
 
     // Cannot use org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal() because it does not
     // return shaded version of object mapper
@@ -54,18 +48,17 @@ public class JSONSchema<T> implements Schema<T>{
         return mapper;
     });
 
+    private final Class<T> pojo;
     private final ObjectMapper objectMapper;
 
-    private JSONSchema(Class<T> pojo, Map<String, String> properties) {
+    private JSONSchema(Class<T> pojo,
+                       org.apache.avro.Schema schema,
+                       Map<String, String> properties) {
+        super(
+            SchemaType.JSON,
+            schema,
+            properties);
         this.pojo = pojo;
-        this.properties = properties;
-
-        this.schema = ReflectData.AllowNull.get().getSchema(pojo);
-        this.schemaInfo = new SchemaInfo();
-        this.schemaInfo.setName("");
-        this.schemaInfo.setProperties(properties);
-        this.schemaInfo.setType(SchemaType.JSON);
-        this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
         this.objectMapper = JSON_MAPPER.get();
     }
 
@@ -106,7 +99,7 @@ public class JSONSchema<T> implements Schema<T>{
             JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo);
             backwardsCompatibleSchemaInfo = new SchemaInfo();
             backwardsCompatibleSchemaInfo.setName("");
-            backwardsCompatibleSchemaInfo.setProperties(properties);
+            backwardsCompatibleSchemaInfo.setProperties(schemaInfo.getProperties());
             backwardsCompatibleSchemaInfo.setType(SchemaType.JSON);
             backwardsCompatibleSchemaInfo.setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
         } catch (JsonProcessingException ex) {
@@ -116,10 +109,24 @@ public class JSONSchema<T> implements Schema<T>{
     }
 
     public static <T> JSONSchema<T> of(Class<T> pojo) {
-        return new JSONSchema<>(pojo, Collections.emptyMap());
+        return new JSONSchema<>(pojo, createAvroSchema(pojo), Collections.emptyMap());
     }
 
     public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> properties) {
-        return new JSONSchema<>(pojo, properties);
+        return new JSONSchema<>(pojo, createAvroSchema(pojo), properties);
+    }
+
+    /**
+     * Create an json schema based on provided schema definition.
+     *
+     * @param pojo pojo class
+     * @param schemaDefinition avro schema definition
+     * @param properties schema properties
+     * @return avro schema instance
+     */
+    public static <T> JSONSchema<T> of(Class<T> pojo,
+                                       String schemaDefinition,
+                                       Map<String, String> properties) {
+        return new JSONSchema<>(pojo, parseAvroSchema(schemaDefinition), properties);
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index c7524e9..de61dca 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Descriptors;
@@ -27,7 +25,6 @@ import com.google.protobuf.Parser;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import org.apache.avro.protobuf.ProtobufDatumReader;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -40,9 +37,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
-public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> implements Schema<T> {
+/**
+ * A schema implementation to deal with protobuf generated messages.
+ */
+public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends StructSchema<T> {
 
-    private SchemaInfo schemaInfo;
     private Parser<T> tParser;
     public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
 
@@ -57,25 +56,27 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> im
         private final Map <String, Object> definition;
     }
 
+    private static <T> org.apache.avro.Schema createProtobufAvroSchema(Class<T> pojo) {
+        ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
+        return datumReader.getSchema();
+    }
+
     private ProtobufSchema(Map<String, String> properties, Class<T> pojo) {
+        super(
+            SchemaType.PROTOBUF,
+            createProtobufAvroSchema(pojo),
+            properties);
+        // update properties with protobuf related properties
         try {
             T protoMessageInstance = (T) pojo.getMethod("getDefaultInstance").invoke(null);
             tParser = (Parser<T>) protoMessageInstance.getParserForType();
 
-            this.schemaInfo = new SchemaInfo();
-            this.schemaInfo.setName("");
-
             Map<String, String> allProperties = new HashMap<>();
-            allProperties.putAll(properties);
+            allProperties.putAll(schemaInfo.getProperties());
             // set protobuf parsing info
             allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
 
-            this.schemaInfo.setProperties(allProperties);
-            this.schemaInfo.setType(SchemaType.PROTOBUF);
-            ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
-            org.apache.avro.Schema schema = datumReader.getSchema();
-            this.schemaInfo.setSchema(schema.toString().getBytes(UTF_8));
-
+            schemaInfo.setProperties(allProperties);
         } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
             throw new IllegalArgumentException(e);
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
new file mode 100644
index 0000000..33ce9de
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Map;
+import org.apache.avro.Schema.Parser;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * This is a base schema implementation for `Struct` types.
+ * A struct type is used for presenting records (objects) which
+ * have multiple fields.
+ *
+ * <p>Currently Pulsar supports 3 `Struct` types -
+ * {@link org.apache.pulsar.common.schema.SchemaType#AVRO},
+ * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
+ * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
+ */
+abstract class StructSchema<T> implements Schema<T> {
+
+    protected final org.apache.avro.Schema schema;
+    protected final SchemaInfo schemaInfo;
+
+    protected StructSchema(SchemaType schemaType,
+                           org.apache.avro.Schema schema,
+                           Map<String, String> properties) {
+        this.schema = schema;
+        this.schemaInfo = new SchemaInfo();
+        this.schemaInfo.setName("");
+        this.schemaInfo.setType(schemaType);
+        this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
+        this.schemaInfo.setProperties(properties);
+    }
+
+    protected org.apache.avro.Schema getAvroSchema() {
+        return schema;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return this.schemaInfo;
+    }
+
+    protected static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) {
+        return ReflectData.AllowNull.get().getSchema(pojo);
+    }
+
+    protected static org.apache.avro.Schema parseAvroSchema(String definition) {
+        Parser parser = new Parser();
+        return parser.parse(definition);
+    }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index e00e1a6..203c226 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -18,12 +18,25 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
 import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.fail;
 
+import java.util.Arrays;
+import java.util.Collections;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidationException;
+import org.apache.avro.SchemaValidator;
+import org.apache.avro.SchemaValidatorBuilder;
+import org.apache.avro.reflect.AvroDefault;
+import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -33,6 +46,67 @@ import org.testng.annotations.Test;
 @Slf4j
 public class AvroSchemaTest {
 
+    @Data
+    private static class DefaultStruct {
+        int field1;
+        String field2;
+        Long field3;
+    }
+
+    @Data
+    private static class StructWithAnnotations {
+        int field1;
+        @Nullable
+        String field2;
+        @AvroDefault("\"1000\"")
+        Long field3;
+    }
+
+    @Test
+    public void testSchemaDefinition() throws SchemaValidationException {
+        org.apache.avro.Schema schema1 = ReflectData.get().getSchema(DefaultStruct.class);
+        AvroSchema<StructWithAnnotations> schema2 = AvroSchema.of(StructWithAnnotations.class);
+
+        String schemaDef1 = schema1.toString();
+        String schemaDef2 = new String(schema2.getSchemaInfo().getSchema(), UTF_8);
+        assertNotEquals(
+            schemaDef1, schemaDef2,
+            "schema1 = " + schemaDef1 + ", schema2 = " + schemaDef2);
+
+        SchemaValidator validator = new SchemaValidatorBuilder()
+            .mutualReadStrategy()
+            .validateLatest();
+        try {
+            validator.validate(
+                schema1,
+                Arrays.asList(
+                    new Schema.Parser().parse(schemaDef2)
+                )
+            );
+            fail("Should fail on validating incompatible schemas");
+        } catch (SchemaValidationException sve) {
+            // expected
+        }
+
+        AvroSchema<StructWithAnnotations> schema3 = AvroSchema.of(schemaDef1, Collections.emptyMap());
+        String schemaDef3 = new String(schema3.getSchemaInfo().getSchema(), UTF_8);
+        assertEquals(schemaDef1, schemaDef3);
+        assertNotEquals(schemaDef2, schemaDef3);
+
+        StructWithAnnotations struct = new StructWithAnnotations();
+        struct.setField1(5678);
+        // schema2 is using the schema generated from POJO,
+        // it allows field2 to be nullable, and field3 has default value.
+        schema2.encode(struct);
+        try {
+            // schema3 is using the schema passed in, which doesn't allow nullable
+            schema3.encode(struct);
+            fail("Should fail to write the record since the provided schema is incompatible");
+        } catch (SchemaSerializationException sse) {
+            // expected
+        }
+    }
+
     @Test
     public void testSchema() {
         AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);