You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/02/24 13:04:29 UTC

[pulsar] branch branch-2.7 updated: [Schema] Schema comparison logic change. (#9612)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 8939db1  [Schema] Schema comparison logic change. (#9612)
8939db1 is described below

commit 8939db1f2f7414999101439ba8108966ee428491
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Feb 23 21:24:14 2021 +0800

    [Schema] Schema comparison logic change. (#9612)
    
    * [Schema] Schema comparison logic change.
    
    * Add the test logic
    
    * change the compare logic
    
    * reimplement
    
    * Fix the test
    
    * Fix some comment
    
    * Change judge to switch.
    
    Co-authored-by: congbo <co...@github.com>
    (cherry picked from commit e01940dd4fc1e81e44c948e9235a37e780ca610a)
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  59 ++++--
 .../broker/service/schema/SchemaServiceTest.java   | 201 ++++++++-------------
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  17 +-
 .../SchemaCompatibilityCheckTest.java              |  52 ++++++
 4 files changed, 187 insertions(+), 142 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index d40c524..eb7f86e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.isNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
@@ -43,6 +44,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -142,17 +144,17 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
             if (schemaVersion != null) {
                 return CompletableFuture.completedFuture(schemaVersion);
             }
-            CompletableFuture<Void> checkCompatibilityFurture = new CompletableFuture<>();
+            CompletableFuture<Void> checkCompatibilityFuture = new CompletableFuture<>();
             if (schemaAndMetadataList.size() != 0) {
                 if (isTransitiveStrategy(strategy)) {
-                    checkCompatibilityFurture = checkCompatibilityWithAll(schema, strategy, schemaAndMetadataList);
+                    checkCompatibilityFuture = checkCompatibilityWithAll(schema, strategy, schemaAndMetadataList);
                 } else {
-                    checkCompatibilityFurture = checkCompatibilityWithLatest(schemaId, schema, strategy);
+                    checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy);
                 }
             } else {
-                checkCompatibilityFurture.complete(null);
+                checkCompatibilityFuture.complete(null);
             }
-            return checkCompatibilityFurture.thenCompose(v -> {
+            return checkCompatibilityFuture.thenCompose(v -> {
                 byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
                 SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
                         .setType(Functions.convertFromDomainType(schema.getType()))
@@ -291,12 +293,36 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
             SchemaData schemaData) {
         final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>();
         SchemaVersion schemaVersion;
-        for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
-            if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                    hashFunction.hashBytes(schemaData.getData()).asBytes())) {
-                schemaVersion = schemaAndMetadata.version;
-                completableFuture.complete(schemaVersion);
-                return completableFuture;
+        if (isUsingAvroSchemaParser(schemaData.getType())) {
+            Schema.Parser parser = new Schema.Parser();
+            Schema newSchema = parser.parse(new String(schemaData.getData(), UTF_8));
+
+            for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+                if (isUsingAvroSchemaParser(schemaData.getType())) {
+                    Schema.Parser existParser = new Schema.Parser();
+                    Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
+                    if (newSchema.equals(existSchema)) {
+                        schemaVersion = schemaAndMetadata.version;
+                        completableFuture.complete(schemaVersion);
+                        return completableFuture;
+                    }
+                } else {
+                    if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
+                            hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                        schemaVersion = schemaAndMetadata.version;
+                        completableFuture.complete(schemaVersion);
+                        return completableFuture;
+                    }
+                }
+            }
+        } else {
+            for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+                if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
+                        hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                    schemaVersion = schemaAndMetadata.version;
+                    completableFuture.complete(schemaVersion);
+                    return completableFuture;
+                }
             }
         }
         completableFuture.complete(null);
@@ -464,4 +490,15 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
         }
     }
 
+    public static boolean isUsingAvroSchemaParser(SchemaType type) {
+        switch (type) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF:
+                return true;
+            default:
+                return false;
+        }
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 6b2f192..a623f05 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -42,8 +42,6 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaStorage;
-import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -53,37 +51,27 @@ import org.testng.annotations.Test;
 
 public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
-    private static Clock MockClock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault());
-
-    private String schemaId1 = "1/2/3/4";
-    private String userId = "user";
-
-    private SchemaData schema1 = SchemaData.builder()
-        .user(userId)
-        .type(SchemaType.JSON)
-        .timestamp(MockClock.millis())
-        .isDeleted(false)
-        .data("message { required int64 a = 1};".getBytes())
-        .props(new TreeMap<>())
-        .build();
-
-    private SchemaData schema2 = SchemaData.builder()
-        .user(userId)
-        .type(SchemaType.JSON)
-        .timestamp(MockClock.millis())
-        .isDeleted(false)
-        .data("message { required int64 b = 1};".getBytes())
-        .props(new TreeMap<>())
-        .build();
-
-    private SchemaData schema3 = SchemaData.builder()
-        .user(userId)
-        .type(SchemaType.JSON)
-        .timestamp(MockClock.millis())
-        .isDeleted(false)
-        .data("message { required int64 c = 1};".getBytes())
-        .props(new TreeMap<>())
-        .build();
+    private static final Clock MockClock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault());
+
+    private final String schemaId1 = "1/2/3/4";
+    private final static String userId = "user";
+
+    private final static String schemaJson1 =
+            "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
+                    ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+    private final static SchemaData schemaData1 = getSchemaData(schemaJson1);
+
+    private final static String schemaJson2 =
+            "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
+                    ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
+                    "{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
+    private final static SchemaData schemaData2 = getSchemaData(schemaJson2);
+
+    private final static String schemaJson3 =
+            "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
+                    ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
+                    "{\"name\":\"field2\",\"type\":\"string\"}]}";
+    private final static SchemaData schemaData3 = getSchemaData(schemaJson3);
 
     private SchemaRegistryServiceImpl schemaRegistryService;
 
@@ -109,10 +97,10 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void writeReadBackDeleteSchemaEntry() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schemaData1, version(0));
 
         SchemaData latest = getLatestSchema(schemaId1, version(0));
-        assertEquals(schema1, latest);
+        assertEquals(schemaData1, latest);
 
         deleteSchema(schemaId1, version(1));
 
@@ -121,154 +109,129 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void findSchemaVersionTest() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
-        assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1, schema1).get().longValue());
+        putSchema(schemaId1, schemaData1, version(0));
+        assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1, schemaData1).get().longValue());
     }
 
     @Test
     public void deleteSchemaAndAddSchema() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schemaData1, version(0));
         SchemaData latest = getLatestSchema(schemaId1, version(0));
-        assertEquals(schema1, latest);
+        assertEquals(schemaData1, latest);
 
         deleteSchema(schemaId1, version(1));
 
         assertNull(schemaRegistryService.getSchema(schemaId1).get());
 
-        putSchema(schemaId1, schema1, version(2));
+        putSchema(schemaId1, schemaData1, version(2));
 
         latest = getLatestSchema(schemaId1, version(2));
-        assertEquals(schema1, latest);
+        assertEquals(schemaData1, latest);
 
     }
 
     @Test
     public void getReturnsTheLastWrittenEntry() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
-        putSchema(schemaId1, schema2, version(1));
+        putSchema(schemaId1, schemaData1, version(0));
+        putSchema(schemaId1, schemaData2, version(1));
 
         SchemaData latest = getLatestSchema(schemaId1, version(1));
-        assertEquals(schema2, latest);
+        assertEquals(schemaData2, latest);
 
     }
 
     @Test
     public void getByVersionReturnsTheCorrectEntry() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
-        putSchema(schemaId1, schema2, version(1));
+        putSchema(schemaId1, schemaData1, version(0));
+        putSchema(schemaId1, schemaData2, version(1));
 
         SchemaData version0 = getSchema(schemaId1, version(0));
-        assertEquals(schema1, version0);
+        assertEquals(schemaData1, version0);
     }
 
     @Test
     public void getByVersionReturnsTheCorrectEntry2() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
-        putSchema(schemaId1, schema2, version(1));
+        putSchema(schemaId1, schemaData1, version(0));
+        putSchema(schemaId1, schemaData2, version(1));
 
         SchemaData version1 = getSchema(schemaId1, version(1));
-        assertEquals(schema2, version1);
+        assertEquals(schemaData2, version1);
     }
 
     @Test
     public void getByVersionReturnsTheCorrectEntry3() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schemaData1, version(0));
 
         SchemaData version1 = getSchema(schemaId1, version(0));
-        assertEquals(schema1, version1);
+        assertEquals(schemaData1, version1);
     }
 
     @Test
     public void getAllVersionSchema() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
-        putSchema(schemaId1, schema2, version(1));
-        putSchema(schemaId1, schema3, version(2));
+        putSchema(schemaId1, schemaData1, version(0));
+        putSchema(schemaId1, schemaData2, version(1));
+        putSchema(schemaId1, schemaData3, version(2));
 
         List<SchemaData> allSchemas = getAllSchemas(schemaId1);
-        assertEquals(schema1, allSchemas.get(0));
-        assertEquals(schema2, allSchemas.get(1));
-        assertEquals(schema3, allSchemas.get(2));
+        assertEquals(schemaData1, allSchemas.get(0));
+        assertEquals(schemaData2, allSchemas.get(1));
+        assertEquals(schemaData3, allSchemas.get(2));
     }
 
     @Test
     public void addLotsOfEntriesThenDelete() throws Exception {
-        SchemaData randomSchema1 = randomSchema();
-        SchemaData randomSchema2 = randomSchema();
-        SchemaData randomSchema3 = randomSchema();
-        SchemaData randomSchema4 = randomSchema();
-        SchemaData randomSchema5 = randomSchema();
-        SchemaData randomSchema6 = randomSchema();
-        SchemaData randomSchema7 = randomSchema();
-
-        putSchema(schemaId1, randomSchema1, version(0));
-        putSchema(schemaId1, randomSchema2, version(1));
-        putSchema(schemaId1, randomSchema3, version(2));
-        putSchema(schemaId1, randomSchema4, version(3));
-        putSchema(schemaId1, randomSchema5, version(4));
-        putSchema(schemaId1, randomSchema6, version(5));
-        putSchema(schemaId1, randomSchema7, version(6));
+
+        putSchema(schemaId1, schemaData1, version(0));
+        putSchema(schemaId1, schemaData2, version(1));
+        putSchema(schemaId1, schemaData3, version(2));
 
         SchemaData version0 = getSchema(schemaId1, version(0));
-        assertEquals(randomSchema1, version0);
+        assertEquals(schemaData1, version0);
 
         SchemaData version1 = getSchema(schemaId1, version(1));
-        assertEquals(randomSchema2, version1);
+        assertEquals(schemaData2, version1);
 
         SchemaData version2 = getSchema(schemaId1, version(2));
-        assertEquals(randomSchema3, version2);
-
-        SchemaData version3 = getSchema(schemaId1, version(3));
-        assertEquals(randomSchema4, version3);
+        assertEquals(schemaData3, version2);
 
-        SchemaData version4 = getSchema(schemaId1, version(4));
-        assertEquals(randomSchema5, version4);
+        deleteSchema(schemaId1, version(3));
 
-        SchemaData version5 = getSchema(schemaId1, version(5));
-        assertEquals(randomSchema6, version5);
-
-        SchemaData version6 = getSchema(schemaId1, version(6));
-        assertEquals(randomSchema7, version6);
-
-        deleteSchema(schemaId1, version(7));
-
-        SchemaRegistry.SchemaAndMetadata version7 = schemaRegistryService.getSchema(schemaId1, version(7)).get();
-        assertNull(version7);
+        SchemaRegistry.SchemaAndMetadata version3 = schemaRegistryService.getSchema(schemaId1, version(3)).get();
+        assertNull(version3);
 
     }
 
     @Test
     public void writeSchemasToDifferentIds() throws Exception {
-        SchemaData schemaWithDifferentId = schema3;
-
-        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schemaData1, version(0));
         String schemaId2 = "id2";
-        putSchema(schemaId2, schemaWithDifferentId, version(0));
+        putSchema(schemaId2, schemaData3, version(0));
 
         SchemaData withFirstId = getLatestSchema(schemaId1, version(0));
         SchemaData withDifferentId = getLatestSchema(schemaId2, version(0));
 
-        assertEquals(schema1, withFirstId);
-        assertEquals(schema3, withDifferentId);
+        assertEquals(schemaData1, withFirstId);
+        assertEquals(schemaData3, withDifferentId);
     }
 
     @Test
     public void dontReAddExistingSchemaAtRoot() throws Exception {
-        putSchema(schemaId1, schema1, version(0));
-        putSchema(schemaId1, schema1, version(0));
-        putSchema(schemaId1, schema1, version(0));
+        putSchema(schemaId1, schemaData1, version(0));
+        putSchema(schemaId1, schemaData1, version(0));
+        putSchema(schemaId1, schemaData1, version(0));
     }
 
     @Test
     public void trimDeletedSchemaAndGetListTest() throws Exception {
         List<SchemaAndMetadata> list = new ArrayList<>();
         CompletableFuture<SchemaVersion> put = schemaRegistryService.putSchemaIfAbsent(
-                schemaId1, schema1, SchemaCompatibilityStrategy.FULL);
+                schemaId1, schemaData1, SchemaCompatibilityStrategy.FULL);
         SchemaVersion newVersion = put.get();
-        list.add(new SchemaAndMetadata(schemaId1, schema1, newVersion));
+        list.add(new SchemaAndMetadata(schemaId1, schemaData1, newVersion));
         put = schemaRegistryService.putSchemaIfAbsent(
-                schemaId1, schema2, SchemaCompatibilityStrategy.FULL);
+                schemaId1, schemaData2, SchemaCompatibilityStrategy.FULL);
         newVersion = put.get();
-        list.add(new SchemaAndMetadata(schemaId1, schema2, newVersion));
+        list.add(new SchemaAndMetadata(schemaId1, schemaData2, newVersion));
         List<SchemaAndMetadata> list1 = schemaRegistryService.trimDeletedSchemaAndGetList(schemaId1).get();
         assertEquals(list.size(), list1.size());
         HashFunction hashFunction = Hashing.sha256();
@@ -285,34 +248,14 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void dontReAddExistingSchemaInMiddle() throws Exception {
-        putSchema(schemaId1, randomSchema(), version(0));
-        putSchema(schemaId1, schema2, version(1));
-        putSchema(schemaId1, randomSchema(), version(2));
-        putSchema(schemaId1, randomSchema(), version(3));
-        putSchema(schemaId1, randomSchema(), version(4));
-        putSchema(schemaId1, randomSchema(), version(5));
-        putSchema(schemaId1, schema2, version(1));
+        putSchema(schemaId1, schemaData1, version(0));
+        putSchema(schemaId1, schemaData2, version(1));
+        putSchema(schemaId1, schemaData3, version(2));
+        putSchema(schemaId1, schemaData2, version(1));
     }
 
     @Test(expectedExceptions = ExecutionException.class)
     public void checkIsCompatible() throws Exception {
-        String schemaJson1 =
-                "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
-                        ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
-        SchemaData schemaData1 = getSchemaData(schemaJson1);
-
-        String schemaJson2 =
-                "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
-                        ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
-                        "{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
-        SchemaData schemaData2 = getSchemaData(schemaJson2);
-
-        String schemaJson3 =
-                "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
-                        ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
-                        "{\"name\":\"field2\",\"type\":\"string\"}]}";
-        SchemaData schemaData3 = getSchemaData(schemaJson3);
-
         putSchema(schemaId1, schemaData1, version(0), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
         putSchema(schemaId1, schemaData2, version(1), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
 
@@ -381,7 +324,7 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
             .build();
     }
 
-    private SchemaData getSchemaData(String schemaJson) {
+    private static SchemaData getSchemaData(String schemaJson) {
         return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).user(userId).build();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index b8580fd..595da2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -21,12 +21,13 @@ package org.apache.pulsar.schema;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Collections;
 
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -37,6 +38,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -193,4 +195,15 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         consumer.close();
         consumer1.close();
     }
+
+    @Test
+    public void testIsUsingAvroSchemaParser() {
+        for (SchemaType value : SchemaType.values()) {
+            if (value == SchemaType.AVRO || value == SchemaType.JSON || value == SchemaType.PROTOBUF) {
+                assertTrue(SchemaRegistryServiceImpl.isUsingAvroSchemaParser(value));
+            } else {
+                assertFalse(SchemaRegistryServiceImpl.isUsingAvroSchemaParser(value));
+            }
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 37c1942..565aeb5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.schema.Schemas;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -45,8 +47,11 @@ import org.testng.annotations.Test;
 import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 
 @Slf4j
@@ -294,6 +299,53 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
         producer.close();
     }
 
+    @Test
+    public void testSchemaComparison() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String topic = "test-schema-comparison";
+
+        String namespace = "test-namespace-" + randomName(16);
+        String fqtn = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topic
+        ).toString();
+
+        NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
+                SchemaCompatibilityStrategy.FULL);
+        byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
+                .getSchemaInfo().getSchema(), UTF_8) + "/n   /n   /n").getBytes();
+        SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
+        admin.schemas().createSchema(fqtn, schemaInfo);
+
+        admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+        ProducerBuilder<Schemas.PersonOne> producerOneBuilder = pulsarClient
+                .newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(fqtn);
+        producerOneBuilder.create().close();
+
+        assertArrayEquals(changeSchemaBytes, admin.schemas().getSchemaInfo(fqtn).getSchema());
+
+        ProducerBuilder<Schemas.PersonThree> producerThreeBuilder = pulsarClient
+                .newProducer(Schema.AVRO(Schemas.PersonThree.class))
+                .topic(fqtn);
+        
+        try {
+            producerThreeBuilder.create();
+            fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
+        }
+    }
+
     @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
     public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
         final String tenant = PUBLIC_TENANT;