You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/26 00:28:18 UTC
[pulsar] branch master updated: Fix schema type check issue when
use always compatible strategy (#10367)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 04f8c96 Fix schema type check issue when use always compatible strategy (#10367)
04f8c96 is described below
commit 04f8c96a6c0c1c93cd495f46fb33d6e44d6004ea
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Apr 26 08:27:44 2021 +0800
Fix schema type check issue when use always compatible strategy (#10367)
Related to #9797
### Motivation
Fix schema type check issue when use always compatible strategy.
1. For non-transitive strategy, only check schema type for the last schema
2. For transitive strategy, check all schema's type
3. Get schema by schema data should consider different schema types
---
.../service/schema/SchemaRegistryServiceImpl.java | 76 ++++---
.../SchemaTypeCompatibilityCheckTest.java | 225 ++++-----------------
2 files changed, 92 insertions(+), 209 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index b06531a..f3afd3d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -140,15 +140,6 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
SchemaCompatibilityStrategy strategy) {
return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
- if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE && schemaAndMetadataList.size() > 0) {
- for (SchemaAndMetadata metadata : schemaAndMetadataList) {
- if (schema.getType() != metadata.schema.getType()) {
- return FutureUtil.failedFuture(new IncompatibleSchemaException(
- String.format("Incompatible schema: exists schema type %s, new schema type %s",
- metadata.schema.getType(), schema.getType())));
- }
- }
- }
if (schemaVersion != null) {
return CompletableFuture.completedFuture(schemaVersion);
}
@@ -299,6 +290,9 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
List<SchemaAndMetadata> schemaAndMetadataList,
SchemaData schemaData) {
+ if (schemaAndMetadataList == null || schemaAndMetadataList.size() == 0) {
+ return CompletableFuture.completedFuture(null);
+ }
final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>();
SchemaVersion schemaVersion;
if (isUsingAvroSchemaParser(schemaData.getType())) {
@@ -309,14 +303,15 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
if (isUsingAvroSchemaParser(schemaData.getType())) {
Schema.Parser existParser = new Schema.Parser();
Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
- if (newSchema.equals(existSchema)) {
+ if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) {
schemaVersion = schemaAndMetadata.version;
completableFuture.complete(schemaVersion);
return completableFuture;
}
} else {
if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
- hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+ hashFunction.hashBytes(schemaData.getData()).asBytes())
+ && schemaAndMetadata.schema.getType() == schemaData.getType()) {
schemaVersion = schemaAndMetadata.version;
completableFuture.complete(schemaVersion);
return completableFuture;
@@ -326,7 +321,8 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
} else {
for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
- hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+ hashFunction.hashBytes(schemaData.getData()).asBytes())
+ && schemaAndMetadata.schema.getType() == schemaData.getType()) {
schemaVersion = schemaAndMetadata.version;
completableFuture.complete(schemaVersion);
return completableFuture;
@@ -339,14 +335,23 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private CompletableFuture<Void> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
+ if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) {
+ return CompletableFuture.completedFuture(null);
+ }
return getSchema(schemaId).thenCompose(existingSchema -> {
if (existingSchema != null && !existingSchema.schema.isDeleted()) {
CompletableFuture<Void> result = new CompletableFuture<>();
- try {
- checkCompatible(existingSchema, schema, strategy);
- result.complete(null);
- } catch (IncompatibleSchemaException e) {
- result.completeExceptionally(e);
+ if (existingSchema.schema.getType() != schema.getType()) {
+ result.completeExceptionally(new IncompatibleSchemaException(
+ String.format("Incompatible schema: exists schema type %s, new schema type %s",
+ existingSchema.schema.getType(), schema.getType())));
+ } else {
+ try {
+ checkCompatible(existingSchema, schema, strategy);
+ result.complete(null);
+ } catch (IncompatibleSchemaException e) {
+ result.completeExceptionally(e);
+ }
}
return result;
} else {
@@ -366,18 +371,35 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
SchemaCompatibilityStrategy strategy,
List<SchemaAndMetadata> schemaAndMetadataList) {
CompletableFuture<Void> result = new CompletableFuture<>();
- try {
- compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
- .checkCompatible(schemaAndMetadataList
- .stream()
- .map(schemaAndMetadata -> schemaAndMetadata.schema)
- .collect(Collectors.toList()), schema, strategy);
+ if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
result.complete(null);
- } catch (Exception e) {
- if (e instanceof IncompatibleSchemaException) {
- result.completeExceptionally(e);
+ } else {
+ SchemaAndMetadata breakSchema = null;
+ for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+ if (schemaAndMetadata.schema.getType() != schema.getType()) {
+ breakSchema = schemaAndMetadata;
+ break;
+ }
+ }
+ if (breakSchema == null) {
+ try {
+ compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
+ .checkCompatible(schemaAndMetadataList
+ .stream()
+ .map(schemaAndMetadata -> schemaAndMetadata.schema)
+ .collect(Collectors.toList()), schema, strategy);
+ result.complete(null);
+ } catch (Exception e) {
+ if (e instanceof IncompatibleSchemaException) {
+ result.completeExceptionally(e);
+ } else {
+ result.completeExceptionally(new IncompatibleSchemaException(e));
+ }
+ }
} else {
- result.completeExceptionally(new IncompatibleSchemaException(e));
+ result.completeExceptionally(new IncompatibleSchemaException(
+ String.format("Incompatible schema: exists schema type %s, new schema type %s",
+ breakSchema.schema.getType(), schema.getType())));
}
}
return result;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index c24822f..bc711c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -30,14 +30,18 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.schema.Schemas;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
@@ -173,101 +177,6 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
}
@Test
- public void structTypeProducerProducerAlwaysCompatible() throws Exception {
- admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
- String topicName = TopicName.get(
- TopicDomain.persistent.value(),
- PUBLIC_TENANT,
- namespace,
- "structTypeProducerProducerAlwaysCompatible"
- ).toString();
-
- pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
- .topic(topicName)
- .create();
-
- pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
- .topic(topicName)
- .create();
- }
-
- @Test
- public void structTypeProducerConsumerAlwaysCompatible() throws Exception {
- admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
- final String subName = "my-sub";
- String topicName = TopicName.get(
- TopicDomain.persistent.value(),
- PUBLIC_TENANT,
- namespace,
- "structTypeProducerConsumerAlwaysCompatible"
- ).toString();
-
- pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
- .topic(topicName)
- .create();
-
- pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
- .topic(topicName)
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName)
- .subscribe();
- }
-
- @Test
- public void structTypeConsumerProducerAlwaysCompatible() throws Exception {
- admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
- final String subName = "my-sub";
- String topicName = TopicName.get(
- TopicDomain.persistent.value(),
- PUBLIC_TENANT,
- namespace,
- "structTypeConsumerProducerAlwaysCompatible"
- ).toString();
-
- pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
- .topic(topicName)
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName)
- .subscribe();
-
- pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
- .topic(topicName)
- .create();
- }
-
- @Test
- public void structTypeConsumerConsumerAlwaysCompatible() throws Exception {
- admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
- final String subName = "my-sub";
- String topicName = TopicName.get(
- TopicDomain.persistent.value(),
- PUBLIC_TENANT,
- namespace,
- "structTypeConsumerConsumerAlwaysCompatible"
- ).toString();
-
- pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
- .topic(topicName)
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName + "1")
- .subscribe();
-
- pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
- .topic(topicName)
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName + "2")
- .subscribe();
- }
-
- @Test
public void primitiveTypeProducerProducerUndefinedCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
@@ -371,98 +280,50 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
}
@Test
- public void primitiveTypeProducerProducerAlwaysCompatible() throws Exception {
+ public void testAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
- String topicName = TopicName.get(
+ final String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
- "primitiveTypeProducerProducerAlwaysCompatible"
+ "testAlwaysCompatible" + UUID.randomUUID().toString()
).toString();
-
- pulsarClient.newProducer(Schema.INT32)
- .topic(topicName)
- .create();
-
- pulsarClient.newProducer(Schema.STRING)
- .topic(topicName)
- .create();
- }
-
- @Test
- public void primitiveTypeProducerConsumerAlwaysCompatible() throws Exception {
- admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
- final String subName = "my-sub";
- String topicName = TopicName.get(
- TopicDomain.persistent.value(),
- PUBLIC_TENANT,
- namespace,
- "primitiveTypeProducerConsumerAlwaysCompatible"
- ).toString();
-
- pulsarClient.newProducer(Schema.INT32)
- .topic(topicName)
- .create();
-
- pulsarClient.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName + "2")
- .subscribe();
- }
-
- @Test
- public void primitiveTypeConsumerProducerAlwaysCompatible() throws Exception {
- admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
- final String subName = "my-sub";
- String topicName = TopicName.get(
- TopicDomain.persistent.value(),
- PUBLIC_TENANT,
- namespace,
- "primitiveTypeConsumerProducerAlwaysCompatible"
- ).toString();
-
- pulsarClient.newConsumer(Schema.INT32)
- .topic(topicName)
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName)
- .subscribe();
-
- pulsarClient.newProducer(Schema.STRING)
- .topic(topicName)
- .create();
- }
-
- @Test
- public void primitiveTypeConsumerConsumerAlwaysCompatible() throws Exception {
- admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
- final String subName = "my-sub";
- String topicName = TopicName.get(
- TopicDomain.persistent.value(),
- PUBLIC_TENANT,
- namespace,
- "primitiveTypeConsumerConsumerAlwaysCompatible"
- ).toString();
-
- pulsarClient.newConsumer(Schema.INT32)
- .topic(topicName)
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName + "1")
- .subscribe();
-
- pulsarClient.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionName(subName + "2")
- .subscribe();
+ Schema<?>[] schemas = new Schema[] {
+ Schema.AVRO(Schemas.PersonOne.class),
+ Schema.AVRO(Schemas.PersonFour.class),
+ Schema.JSON(Schemas.PersonOne.class),
+ Schema.JSON(Schemas.PersonFour.class),
+ Schema.INT8,
+ Schema.INT16,
+ Schema.INT32,
+ Schema.INT64,
+ Schema.DATE,
+ Schema.BOOL,
+ Schema.DOUBLE,
+ Schema.STRING,
+ Schema.BYTES,
+ Schema.FLOAT,
+ Schema.INSTANT,
+ Schema.BYTEBUFFER,
+ Schema.TIME,
+ Schema.TIMESTAMP,
+ Schema.LOCAL_DATE,
+ Schema.LOCAL_DATE_TIME,
+ Schema.LOCAL_TIME
+ };
+
+ for (Schema<?> schema : schemas) {
+ pulsarClient.newProducer(schema)
+ .topic(topicName)
+ .create();
+ }
+
+ for (Schema<?> schema : schemas) {
+ pulsarClient.newConsumer(schema)
+ .topic(topicName)
+ .subscriptionName(UUID.randomUUID().toString())
+ .subscribe();
+ }
}
}