You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/27 08:06:36 UTC

[pulsar] 01/03: Fix schema type check issue when use always compatible strategy (#10367)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22ac4c842e1f64cd1d84d54ee2bb4c10fb080caf
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Apr 26 08:27:44 2021 +0800

    Fix schema type check issue when use always compatible strategy (#10367)
    
    Related to #9797
    
    Fix schema type check issue when use always compatible strategy.
    
    1. For non-transitive strategy, only check schema type for the last schema
    2. For transitive strategy, check all schema's type
    3. Get schema by schema data should consider different schema types
    
    (cherry picked from commit 04f8c96a6c0c1c93cd495f46fb33d6e44d6004ea)
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  75 ++++---
 .../SchemaTypeCompatibilityCheckTest.java          | 225 ++++-----------------
 2 files changed, 92 insertions(+), 208 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 37f7c56..0f493ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -141,15 +141,6 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                                                               SchemaCompatibilityStrategy strategy) {
         return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
                 getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
-            if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE && schemaAndMetadataList.size() > 0) {
-                for (SchemaAndMetadata metadata : schemaAndMetadataList) {
-                    if (schema.getType() != metadata.schema.getType()) {
-                        return FutureUtil.failedFuture(new IncompatibleSchemaException(
-                                String.format("Incompatible schema: exists schema type %s, new schema type %s",
-                                metadata.schema.getType(), schema.getType())));
-                    }
-                }
-            }
             if (schemaVersion != null) {
                 return CompletableFuture.completedFuture(schemaVersion);
             }
@@ -298,6 +289,9 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
             List<SchemaAndMetadata> schemaAndMetadataList,
             SchemaData schemaData) {
+        if (schemaAndMetadataList == null || schemaAndMetadataList.size() == 0) {
+            return CompletableFuture.completedFuture(null);
+        }
         final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>();
         SchemaVersion schemaVersion;
         if (isUsingAvroSchemaParser(schemaData.getType())) {
@@ -308,14 +302,15 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                 if (isUsingAvroSchemaParser(schemaData.getType())) {
                     Schema.Parser existParser = new Schema.Parser();
                     Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
-                    if (newSchema.equals(existSchema)) {
+                    if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
                         completableFuture.complete(schemaVersion);
                         return completableFuture;
                     }
                 } else {
                     if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                            hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                            hashFunction.hashBytes(schemaData.getData()).asBytes())
+                            && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
                         completableFuture.complete(schemaVersion);
                         return completableFuture;
@@ -325,7 +320,8 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
         } else {
             for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
                 if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                        hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                        hashFunction.hashBytes(schemaData.getData()).asBytes())
+                        && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                     schemaVersion = schemaAndMetadata.version;
                     completableFuture.complete(schemaVersion);
                     return completableFuture;
@@ -338,14 +334,23 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
 
     private CompletableFuture<Void> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
                                                                     SchemaCompatibilityStrategy strategy) {
+        if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) {
+            return CompletableFuture.completedFuture(null);
+        }
         return getSchema(schemaId).thenCompose(existingSchema -> {
             if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                 CompletableFuture<Void> result = new CompletableFuture<>();
-                try {
-                    checkCompatible(existingSchema, schema, strategy);
-                    result.complete(null);
-                } catch (IncompatibleSchemaException e) {
-                    result.completeExceptionally(e);
+                if (existingSchema.schema.getType() != schema.getType()) {
+                    result.completeExceptionally(new IncompatibleSchemaException(
+                            String.format("Incompatible schema: exists schema type %s, new schema type %s",
+                                    existingSchema.schema.getType(), schema.getType())));
+                } else {
+                    try {
+                        checkCompatible(existingSchema, schema, strategy);
+                        result.complete(null);
+                    } catch (IncompatibleSchemaException e) {
+                        result.completeExceptionally(e);
+                    }
                 }
                 return result;
             } else {
@@ -365,17 +370,35 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                                                               SchemaCompatibilityStrategy strategy,
                                                               List<SchemaAndMetadata> schemaAndMetadataList) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        try {
-            compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT).checkCompatible(schemaAndMetadataList
-                    .stream()
-                    .map(schemaAndMetadata -> schemaAndMetadata.schema)
-                    .collect(Collectors.toList()), schema, strategy);
+        if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
             result.complete(null);
-        } catch (Exception e) {
-            if (e instanceof IncompatibleSchemaException) {
-                result.completeExceptionally(e);
+        } else {
+            SchemaAndMetadata breakSchema = null;
+            for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+                if (schemaAndMetadata.schema.getType() != schema.getType()) {
+                    breakSchema = schemaAndMetadata;
+                    break;
+                }
+            }
+            if (breakSchema == null) {
+                try {
+                    compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
+                            .checkCompatible(schemaAndMetadataList
+                                    .stream()
+                                    .map(schemaAndMetadata -> schemaAndMetadata.schema)
+                                    .collect(Collectors.toList()), schema, strategy);
+                    result.complete(null);
+                } catch (Exception e) {
+                    if (e instanceof IncompatibleSchemaException) {
+                        result.completeExceptionally(e);
+                    } else {
+                        result.completeExceptionally(new IncompatibleSchemaException(e));
+                    }
+                }
             } else {
-                result.completeExceptionally(new IncompatibleSchemaException(e));
+                result.completeExceptionally(new IncompatibleSchemaException(
+                        String.format("Incompatible schema: exists schema type %s, new schema type %s",
+                                breakSchema.schema.getType(), schema.getType())));
             }
         }
         return result;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index c24822f..bc711c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -30,14 +30,18 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.schema.Schemas;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 
@@ -173,101 +177,6 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
     }
 
     @Test
-    public void structTypeProducerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeProducerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void structTypeProducerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeProducerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-    }
-
-    @Test
-    public void structTypeConsumerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeConsumerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-
-        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void structTypeConsumerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeConsumerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "1")
-                .subscribe();
-
-        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
-    }
-
-    @Test
     public void primitiveTypeProducerProducerUndefinedCompatible() throws Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
 
@@ -371,98 +280,50 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
     }
 
     @Test
-    public void primitiveTypeProducerProducerAlwaysCompatible() throws Exception {
+    public void testAlwaysCompatible() throws Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        String topicName = TopicName.get(
+        final String topicName = TopicName.get(
                 TopicDomain.persistent.value(),
                 PUBLIC_TENANT,
                 namespace,
-                "primitiveTypeProducerProducerAlwaysCompatible"
+                "testAlwaysCompatible" + UUID.randomUUID().toString()
         ).toString();
-
-        pulsarClient.newProducer(Schema.INT32)
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newProducer(Schema.STRING)
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void primitiveTypeProducerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeProducerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.INT32)
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newConsumer(Schema.STRING)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
-    }
-
-    @Test
-    public void primitiveTypeConsumerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeConsumerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.INT32)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-
-        pulsarClient.newProducer(Schema.STRING)
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void primitiveTypeConsumerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeConsumerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.INT32)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "1")
-                .subscribe();
-
-        pulsarClient.newConsumer(Schema.STRING)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
+        Schema<?>[] schemas = new Schema[] {
+                Schema.AVRO(Schemas.PersonOne.class),
+                Schema.AVRO(Schemas.PersonFour.class),
+                Schema.JSON(Schemas.PersonOne.class),
+                Schema.JSON(Schemas.PersonFour.class),
+                Schema.INT8,
+                Schema.INT16,
+                Schema.INT32,
+                Schema.INT64,
+                Schema.DATE,
+                Schema.BOOL,
+                Schema.DOUBLE,
+                Schema.STRING,
+                Schema.BYTES,
+                Schema.FLOAT,
+                Schema.INSTANT,
+                Schema.BYTEBUFFER,
+                Schema.TIME,
+                Schema.TIMESTAMP,
+                Schema.LOCAL_DATE,
+                Schema.LOCAL_DATE_TIME,
+                Schema.LOCAL_TIME
+        };
+
+        for (Schema<?> schema : schemas) {
+            pulsarClient.newProducer(schema)
+                    .topic(topicName)
+                    .create();
+        }
+
+        for (Schema<?> schema : schemas) {
+            pulsarClient.newConsumer(schema)
+                    .topic(topicName)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .subscribe();
+        }
     }
 
 }