You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/26 00:28:18 UTC

[pulsar] branch master updated: Fix schema type check issue when use always compatible strategy (#10367)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 04f8c96  Fix schema type check issue when use always compatible strategy (#10367)
04f8c96 is described below

commit 04f8c96a6c0c1c93cd495f46fb33d6e44d6004ea
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Apr 26 08:27:44 2021 +0800

    Fix schema type check issue when use always compatible strategy (#10367)
    
    Related to #9797
    
    ### Motivation
    
    Fix schema type check issue when use always compatible strategy.
    
    1. For non-transitive strategy, only check schema type for the last schema
    2. For transitive strategy, check all schema's type
    3. Get schema by schema data should consider different schema types
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  76 ++++---
 .../SchemaTypeCompatibilityCheckTest.java          | 225 ++++-----------------
 2 files changed, 92 insertions(+), 209 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index b06531a..f3afd3d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -140,15 +140,6 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                                                               SchemaCompatibilityStrategy strategy) {
         return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
                 getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
-            if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE && schemaAndMetadataList.size() > 0) {
-                for (SchemaAndMetadata metadata : schemaAndMetadataList) {
-                    if (schema.getType() != metadata.schema.getType()) {
-                        return FutureUtil.failedFuture(new IncompatibleSchemaException(
-                                String.format("Incompatible schema: exists schema type %s, new schema type %s",
-                                metadata.schema.getType(), schema.getType())));
-                    }
-                }
-            }
             if (schemaVersion != null) {
                 return CompletableFuture.completedFuture(schemaVersion);
             }
@@ -299,6 +290,9 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
             List<SchemaAndMetadata> schemaAndMetadataList,
             SchemaData schemaData) {
+        if (schemaAndMetadataList == null || schemaAndMetadataList.size() == 0) {
+            return CompletableFuture.completedFuture(null);
+        }
         final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>();
         SchemaVersion schemaVersion;
         if (isUsingAvroSchemaParser(schemaData.getType())) {
@@ -309,14 +303,15 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                 if (isUsingAvroSchemaParser(schemaData.getType())) {
                     Schema.Parser existParser = new Schema.Parser();
                     Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
-                    if (newSchema.equals(existSchema)) {
+                    if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
                         completableFuture.complete(schemaVersion);
                         return completableFuture;
                     }
                 } else {
                     if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                            hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                            hashFunction.hashBytes(schemaData.getData()).asBytes())
+                            && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
                         completableFuture.complete(schemaVersion);
                         return completableFuture;
@@ -326,7 +321,8 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
         } else {
             for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
                 if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                        hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                        hashFunction.hashBytes(schemaData.getData()).asBytes())
+                        && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                     schemaVersion = schemaAndMetadata.version;
                     completableFuture.complete(schemaVersion);
                     return completableFuture;
@@ -339,14 +335,23 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
 
     private CompletableFuture<Void> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
                                                                     SchemaCompatibilityStrategy strategy) {
+        if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) {
+            return CompletableFuture.completedFuture(null);
+        }
         return getSchema(schemaId).thenCompose(existingSchema -> {
             if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                 CompletableFuture<Void> result = new CompletableFuture<>();
-                try {
-                    checkCompatible(existingSchema, schema, strategy);
-                    result.complete(null);
-                } catch (IncompatibleSchemaException e) {
-                    result.completeExceptionally(e);
+                if (existingSchema.schema.getType() != schema.getType()) {
+                    result.completeExceptionally(new IncompatibleSchemaException(
+                            String.format("Incompatible schema: exists schema type %s, new schema type %s",
+                                    existingSchema.schema.getType(), schema.getType())));
+                } else {
+                    try {
+                        checkCompatible(existingSchema, schema, strategy);
+                        result.complete(null);
+                    } catch (IncompatibleSchemaException e) {
+                        result.completeExceptionally(e);
+                    }
                 }
                 return result;
             } else {
@@ -366,18 +371,35 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                                                               SchemaCompatibilityStrategy strategy,
                                                               List<SchemaAndMetadata> schemaAndMetadataList) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        try {
-            compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
-                    .checkCompatible(schemaAndMetadataList
-                            .stream()
-                            .map(schemaAndMetadata -> schemaAndMetadata.schema)
-                            .collect(Collectors.toList()), schema, strategy);
+        if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
             result.complete(null);
-        } catch (Exception e) {
-            if (e instanceof IncompatibleSchemaException) {
-                result.completeExceptionally(e);
+        } else {
+            SchemaAndMetadata breakSchema = null;
+            for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+                if (schemaAndMetadata.schema.getType() != schema.getType()) {
+                    breakSchema = schemaAndMetadata;
+                    break;
+                }
+            }
+            if (breakSchema == null) {
+                try {
+                    compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
+                            .checkCompatible(schemaAndMetadataList
+                                    .stream()
+                                    .map(schemaAndMetadata -> schemaAndMetadata.schema)
+                                    .collect(Collectors.toList()), schema, strategy);
+                    result.complete(null);
+                } catch (Exception e) {
+                    if (e instanceof IncompatibleSchemaException) {
+                        result.completeExceptionally(e);
+                    } else {
+                        result.completeExceptionally(new IncompatibleSchemaException(e));
+                    }
+                }
             } else {
-                result.completeExceptionally(new IncompatibleSchemaException(e));
+                result.completeExceptionally(new IncompatibleSchemaException(
+                        String.format("Incompatible schema: exists schema type %s, new schema type %s",
+                                breakSchema.schema.getType(), schema.getType())));
             }
         }
         return result;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index c24822f..bc711c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -30,14 +30,18 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.schema.Schemas;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 
@@ -173,101 +177,6 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
     }
 
     @Test
-    public void structTypeProducerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeProducerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void structTypeProducerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeProducerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-    }
-
-    @Test
-    public void structTypeConsumerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeConsumerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-
-        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void structTypeConsumerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeConsumerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "1")
-                .subscribe();
-
-        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
-    }
-
-    @Test
     public void primitiveTypeProducerProducerUndefinedCompatible() throws Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
 
@@ -371,98 +280,50 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
     }
 
     @Test
-    public void primitiveTypeProducerProducerAlwaysCompatible() throws Exception {
+    public void testAlwaysCompatible() throws Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        String topicName = TopicName.get(
+        final String topicName = TopicName.get(
                 TopicDomain.persistent.value(),
                 PUBLIC_TENANT,
                 namespace,
-                "primitiveTypeProducerProducerAlwaysCompatible"
+                "testAlwaysCompatible" + UUID.randomUUID().toString()
         ).toString();
-
-        pulsarClient.newProducer(Schema.INT32)
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newProducer(Schema.STRING)
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void primitiveTypeProducerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeProducerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.INT32)
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newConsumer(Schema.STRING)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
-    }
-
-    @Test
-    public void primitiveTypeConsumerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeConsumerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.INT32)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-
-        pulsarClient.newProducer(Schema.STRING)
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void primitiveTypeConsumerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeConsumerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.INT32)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "1")
-                .subscribe();
-
-        pulsarClient.newConsumer(Schema.STRING)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
+        Schema<?>[] schemas = new Schema[] {
+                Schema.AVRO(Schemas.PersonOne.class),
+                Schema.AVRO(Schemas.PersonFour.class),
+                Schema.JSON(Schemas.PersonOne.class),
+                Schema.JSON(Schemas.PersonFour.class),
+                Schema.INT8,
+                Schema.INT16,
+                Schema.INT32,
+                Schema.INT64,
+                Schema.DATE,
+                Schema.BOOL,
+                Schema.DOUBLE,
+                Schema.STRING,
+                Schema.BYTES,
+                Schema.FLOAT,
+                Schema.INSTANT,
+                Schema.BYTEBUFFER,
+                Schema.TIME,
+                Schema.TIMESTAMP,
+                Schema.LOCAL_DATE,
+                Schema.LOCAL_DATE_TIME,
+                Schema.LOCAL_TIME
+        };
+
+        for (Schema<?> schema : schemas) {
+            pulsarClient.newProducer(schema)
+                    .topic(topicName)
+                    .create();
+        }
+
+        for (Schema<?> schema : schemas) {
+            pulsarClient.newConsumer(schema)
+                    .topic(topicName)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .subscribe();
+        }
     }
 
 }