You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/02/24 13:04:29 UTC
[pulsar] branch branch-2.7 updated: [Schema] Schema comparison
logic change. (#9612)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 8939db1 [Schema] Schema comparison logic change. (#9612)
8939db1 is described below
commit 8939db1f2f7414999101439ba8108966ee428491
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Feb 23 21:24:14 2021 +0800
[Schema] Schema comparison logic change. (#9612)
* [Schema] Schema comparison logic change.
* Add the test logic
* change the compare logic
* reimplement
* Fix the test
* Fix some comment
* Change judge to switch.
Co-authored-by: congbo <co...@github.com>
(cherry picked from commit e01940dd4fc1e81e44c948e9235a37e780ca610a)
---
.../service/schema/SchemaRegistryServiceImpl.java | 59 ++++--
.../broker/service/schema/SchemaServiceTest.java | 201 ++++++++-------------
.../java/org/apache/pulsar/schema/SchemaTest.java | 17 +-
.../SchemaCompatibilityCheckTest.java | 52 ++++++
4 files changed, 187 insertions(+), 142 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index d40c524..eb7f86e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.schema;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
@@ -43,6 +44,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -142,17 +144,17 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
if (schemaVersion != null) {
return CompletableFuture.completedFuture(schemaVersion);
}
- CompletableFuture<Void> checkCompatibilityFurture = new CompletableFuture<>();
+ CompletableFuture<Void> checkCompatibilityFuture = new CompletableFuture<>();
if (schemaAndMetadataList.size() != 0) {
if (isTransitiveStrategy(strategy)) {
- checkCompatibilityFurture = checkCompatibilityWithAll(schema, strategy, schemaAndMetadataList);
+ checkCompatibilityFuture = checkCompatibilityWithAll(schema, strategy, schemaAndMetadataList);
} else {
- checkCompatibilityFurture = checkCompatibilityWithLatest(schemaId, schema, strategy);
+ checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy);
}
} else {
- checkCompatibilityFurture.complete(null);
+ checkCompatibilityFuture.complete(null);
}
- return checkCompatibilityFurture.thenCompose(v -> {
+ return checkCompatibilityFuture.thenCompose(v -> {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
@@ -291,12 +293,36 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
SchemaData schemaData) {
final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>();
SchemaVersion schemaVersion;
- for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
- if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
- hashFunction.hashBytes(schemaData.getData()).asBytes())) {
- schemaVersion = schemaAndMetadata.version;
- completableFuture.complete(schemaVersion);
- return completableFuture;
+ if (isUsingAvroSchemaParser(schemaData.getType())) {
+ Schema.Parser parser = new Schema.Parser();
+ Schema newSchema = parser.parse(new String(schemaData.getData(), UTF_8));
+
+ for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+ if (isUsingAvroSchemaParser(schemaData.getType())) {
+ Schema.Parser existParser = new Schema.Parser();
+ Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
+ if (newSchema.equals(existSchema)) {
+ schemaVersion = schemaAndMetadata.version;
+ completableFuture.complete(schemaVersion);
+ return completableFuture;
+ }
+ } else {
+ if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
+ hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+ schemaVersion = schemaAndMetadata.version;
+ completableFuture.complete(schemaVersion);
+ return completableFuture;
+ }
+ }
+ }
+ } else {
+ for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+ if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
+ hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+ schemaVersion = schemaAndMetadata.version;
+ completableFuture.complete(schemaVersion);
+ return completableFuture;
+ }
}
}
completableFuture.complete(null);
@@ -464,4 +490,15 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
}
}
+ public static boolean isUsingAvroSchemaParser(SchemaType type) {
+ switch (type) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF:
+ return true;
+ default:
+ return false;
+ }
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 6b2f192..a623f05 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -42,8 +42,6 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaStorage;
-import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -53,37 +51,27 @@ import org.testng.annotations.Test;
public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
- private static Clock MockClock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault());
-
- private String schemaId1 = "1/2/3/4";
- private String userId = "user";
-
- private SchemaData schema1 = SchemaData.builder()
- .user(userId)
- .type(SchemaType.JSON)
- .timestamp(MockClock.millis())
- .isDeleted(false)
- .data("message { required int64 a = 1};".getBytes())
- .props(new TreeMap<>())
- .build();
-
- private SchemaData schema2 = SchemaData.builder()
- .user(userId)
- .type(SchemaType.JSON)
- .timestamp(MockClock.millis())
- .isDeleted(false)
- .data("message { required int64 b = 1};".getBytes())
- .props(new TreeMap<>())
- .build();
-
- private SchemaData schema3 = SchemaData.builder()
- .user(userId)
- .type(SchemaType.JSON)
- .timestamp(MockClock.millis())
- .isDeleted(false)
- .data("message { required int64 c = 1};".getBytes())
- .props(new TreeMap<>())
- .build();
+ private static final Clock MockClock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault());
+
+ private final String schemaId1 = "1/2/3/4";
+ private final static String userId = "user";
+
+ private final static String schemaJson1 =
+ "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
+ ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
+ private final static SchemaData schemaData1 = getSchemaData(schemaJson1);
+
+ private final static String schemaJson2 =
+ "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
+ ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
+ "{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
+ private final static SchemaData schemaData2 = getSchemaData(schemaJson2);
+
+ private final static String schemaJson3 =
+ "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
+ ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
+ "{\"name\":\"field2\",\"type\":\"string\"}]}";
+ private final static SchemaData schemaData3 = getSchemaData(schemaJson3);
private SchemaRegistryServiceImpl schemaRegistryService;
@@ -109,10 +97,10 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
@Test
public void writeReadBackDeleteSchemaEntry() throws Exception {
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
SchemaData latest = getLatestSchema(schemaId1, version(0));
- assertEquals(schema1, latest);
+ assertEquals(schemaData1, latest);
deleteSchema(schemaId1, version(1));
@@ -121,154 +109,129 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
@Test
public void findSchemaVersionTest() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1, schema1).get().longValue());
+ putSchema(schemaId1, schemaData1, version(0));
+ assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1, schemaData1).get().longValue());
}
@Test
public void deleteSchemaAndAddSchema() throws Exception {
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
SchemaData latest = getLatestSchema(schemaId1, version(0));
- assertEquals(schema1, latest);
+ assertEquals(schemaData1, latest);
deleteSchema(schemaId1, version(1));
assertNull(schemaRegistryService.getSchema(schemaId1).get());
- putSchema(schemaId1, schema1, version(2));
+ putSchema(schemaId1, schemaData1, version(2));
latest = getLatestSchema(schemaId1, version(2));
- assertEquals(schema1, latest);
+ assertEquals(schemaData1, latest);
}
@Test
public void getReturnsTheLastWrittenEntry() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema2, version(1));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
SchemaData latest = getLatestSchema(schemaId1, version(1));
- assertEquals(schema2, latest);
+ assertEquals(schemaData2, latest);
}
@Test
public void getByVersionReturnsTheCorrectEntry() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema2, version(1));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
SchemaData version0 = getSchema(schemaId1, version(0));
- assertEquals(schema1, version0);
+ assertEquals(schemaData1, version0);
}
@Test
public void getByVersionReturnsTheCorrectEntry2() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema2, version(1));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
SchemaData version1 = getSchema(schemaId1, version(1));
- assertEquals(schema2, version1);
+ assertEquals(schemaData2, version1);
}
@Test
public void getByVersionReturnsTheCorrectEntry3() throws Exception {
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
SchemaData version1 = getSchema(schemaId1, version(0));
- assertEquals(schema1, version1);
+ assertEquals(schemaData1, version1);
}
@Test
public void getAllVersionSchema() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema2, version(1));
- putSchema(schemaId1, schema3, version(2));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
+ putSchema(schemaId1, schemaData3, version(2));
List<SchemaData> allSchemas = getAllSchemas(schemaId1);
- assertEquals(schema1, allSchemas.get(0));
- assertEquals(schema2, allSchemas.get(1));
- assertEquals(schema3, allSchemas.get(2));
+ assertEquals(schemaData1, allSchemas.get(0));
+ assertEquals(schemaData2, allSchemas.get(1));
+ assertEquals(schemaData3, allSchemas.get(2));
}
@Test
public void addLotsOfEntriesThenDelete() throws Exception {
- SchemaData randomSchema1 = randomSchema();
- SchemaData randomSchema2 = randomSchema();
- SchemaData randomSchema3 = randomSchema();
- SchemaData randomSchema4 = randomSchema();
- SchemaData randomSchema5 = randomSchema();
- SchemaData randomSchema6 = randomSchema();
- SchemaData randomSchema7 = randomSchema();
-
- putSchema(schemaId1, randomSchema1, version(0));
- putSchema(schemaId1, randomSchema2, version(1));
- putSchema(schemaId1, randomSchema3, version(2));
- putSchema(schemaId1, randomSchema4, version(3));
- putSchema(schemaId1, randomSchema5, version(4));
- putSchema(schemaId1, randomSchema6, version(5));
- putSchema(schemaId1, randomSchema7, version(6));
+
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
+ putSchema(schemaId1, schemaData3, version(2));
SchemaData version0 = getSchema(schemaId1, version(0));
- assertEquals(randomSchema1, version0);
+ assertEquals(schemaData1, version0);
SchemaData version1 = getSchema(schemaId1, version(1));
- assertEquals(randomSchema2, version1);
+ assertEquals(schemaData2, version1);
SchemaData version2 = getSchema(schemaId1, version(2));
- assertEquals(randomSchema3, version2);
-
- SchemaData version3 = getSchema(schemaId1, version(3));
- assertEquals(randomSchema4, version3);
+ assertEquals(schemaData3, version2);
- SchemaData version4 = getSchema(schemaId1, version(4));
- assertEquals(randomSchema5, version4);
+ deleteSchema(schemaId1, version(3));
- SchemaData version5 = getSchema(schemaId1, version(5));
- assertEquals(randomSchema6, version5);
-
- SchemaData version6 = getSchema(schemaId1, version(6));
- assertEquals(randomSchema7, version6);
-
- deleteSchema(schemaId1, version(7));
-
- SchemaRegistry.SchemaAndMetadata version7 = schemaRegistryService.getSchema(schemaId1, version(7)).get();
- assertNull(version7);
+ SchemaRegistry.SchemaAndMetadata version3 = schemaRegistryService.getSchema(schemaId1, version(3)).get();
+ assertNull(version3);
}
@Test
public void writeSchemasToDifferentIds() throws Exception {
- SchemaData schemaWithDifferentId = schema3;
-
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
String schemaId2 = "id2";
- putSchema(schemaId2, schemaWithDifferentId, version(0));
+ putSchema(schemaId2, schemaData3, version(0));
SchemaData withFirstId = getLatestSchema(schemaId1, version(0));
SchemaData withDifferentId = getLatestSchema(schemaId2, version(0));
- assertEquals(schema1, withFirstId);
- assertEquals(schema3, withDifferentId);
+ assertEquals(schemaData1, withFirstId);
+ assertEquals(schemaData3, withDifferentId);
}
@Test
public void dontReAddExistingSchemaAtRoot() throws Exception {
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema1, version(0));
- putSchema(schemaId1, schema1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData1, version(0));
}
@Test
public void trimDeletedSchemaAndGetListTest() throws Exception {
List<SchemaAndMetadata> list = new ArrayList<>();
CompletableFuture<SchemaVersion> put = schemaRegistryService.putSchemaIfAbsent(
- schemaId1, schema1, SchemaCompatibilityStrategy.FULL);
+ schemaId1, schemaData1, SchemaCompatibilityStrategy.FULL);
SchemaVersion newVersion = put.get();
- list.add(new SchemaAndMetadata(schemaId1, schema1, newVersion));
+ list.add(new SchemaAndMetadata(schemaId1, schemaData1, newVersion));
put = schemaRegistryService.putSchemaIfAbsent(
- schemaId1, schema2, SchemaCompatibilityStrategy.FULL);
+ schemaId1, schemaData2, SchemaCompatibilityStrategy.FULL);
newVersion = put.get();
- list.add(new SchemaAndMetadata(schemaId1, schema2, newVersion));
+ list.add(new SchemaAndMetadata(schemaId1, schemaData2, newVersion));
List<SchemaAndMetadata> list1 = schemaRegistryService.trimDeletedSchemaAndGetList(schemaId1).get();
assertEquals(list.size(), list1.size());
HashFunction hashFunction = Hashing.sha256();
@@ -285,34 +248,14 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
@Test
public void dontReAddExistingSchemaInMiddle() throws Exception {
- putSchema(schemaId1, randomSchema(), version(0));
- putSchema(schemaId1, schema2, version(1));
- putSchema(schemaId1, randomSchema(), version(2));
- putSchema(schemaId1, randomSchema(), version(3));
- putSchema(schemaId1, randomSchema(), version(4));
- putSchema(schemaId1, randomSchema(), version(5));
- putSchema(schemaId1, schema2, version(1));
+ putSchema(schemaId1, schemaData1, version(0));
+ putSchema(schemaId1, schemaData2, version(1));
+ putSchema(schemaId1, schemaData3, version(2));
+ putSchema(schemaId1, schemaData2, version(1));
}
@Test(expectedExceptions = ExecutionException.class)
public void checkIsCompatible() throws Exception {
- String schemaJson1 =
- "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
- ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
- SchemaData schemaData1 = getSchemaData(schemaJson1);
-
- String schemaJson2 =
- "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
- ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
- "{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
- SchemaData schemaData2 = getSchemaData(schemaJson2);
-
- String schemaJson3 =
- "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
- ".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
- "{\"name\":\"field2\",\"type\":\"string\"}]}";
- SchemaData schemaData3 = getSchemaData(schemaJson3);
-
putSchema(schemaId1, schemaData1, version(0), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
putSchema(schemaId1, schemaData2, version(1), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
@@ -381,7 +324,7 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
.build();
}
- private SchemaData getSchemaData(String schemaJson) {
+ private static SchemaData getSchemaData(String schemaJson) {
return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).user(userId).build();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index b8580fd..595da2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -21,12 +21,13 @@ package org.apache.pulsar.schema;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.Collections;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -37,6 +38,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -193,4 +195,15 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
consumer.close();
consumer1.close();
}
+
+ @Test
+ public void testIsUsingAvroSchemaParser() {
+ for (SchemaType value : SchemaType.values()) {
+ if (value == SchemaType.AVRO || value == SchemaType.JSON || value == SchemaType.PROTOBUF) {
+ assertTrue(SchemaRegistryServiceImpl.isUsingAvroSchemaParser(value));
+ } else {
+ assertFalse(SchemaRegistryServiceImpl.isUsingAvroSchemaParser(value));
+ }
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 37c1942..565aeb5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.Schemas;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -45,8 +47,11 @@ import org.testng.annotations.Test;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
@Slf4j
@@ -294,6 +299,53 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
producer.close();
}
+ @Test
+ public void testSchemaComparison() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String topic = "test-schema-comparison";
+
+ String namespace = "test-namespace-" + randomName(16);
+ String fqtn = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topic
+ ).toString();
+
+ NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME)
+ );
+
+ assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
+ SchemaCompatibilityStrategy.FULL);
+ byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
+ .getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes();
+ SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
+ admin.schemas().createSchema(fqtn, schemaInfo);
+
+ admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
+ ProducerBuilder<Schemas.PersonOne> producerOneBuilder = pulsarClient
+ .newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(fqtn);
+ producerOneBuilder.create().close();
+
+ assertArrayEquals(changeSchemaBytes, admin.schemas().getSchemaInfo(fqtn).getSchema());
+
+ ProducerBuilder<Schemas.PersonThree> producerThreeBuilder = pulsarClient
+ .newProducer(Schema.AVRO(Schemas.PersonThree.class))
+ .topic(fqtn);
+
+ try {
+ producerThreeBuilder.create();
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
+ }
+ }
+
@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;