You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/09/01 08:07:43 UTC

[pulsar] branch master updated: [Schema] Schema compatibility strategy in broker level. (#11856)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b079c1e  [Schema] Schema compatibility strategy in broker level. (#11856)
b079c1e is described below

commit b079c1e047975179260b64aa9f70c8f384580aba
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Sep 1 16:07:04 2021 +0800

    [Schema] Schema compatibility strategy in broker level. (#11856)
    
    ## Motivation
    link #11849
    Schema compatibility strategy config in broker level.
    
    ## implement
    If namespace schema compatibility strategy is `UNDEFINED`, use broker schema compatibility strategy
---
 conf/broker.conf                                   |  7 ++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 10 +++++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  7 ++++--
 .../broker/admin/impl/SchemasResourceBase.java     |  8 +++++--
 .../pulsar/broker/service/AbstractTopic.java       | 10 ++++++---
 .../SchemaTypeCompatibilityCheckTest.java          | 25 ++++++++++++++++++++++
 6 files changed, 60 insertions(+), 7 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index bb6344f..46ddb7d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1200,6 +1200,13 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
 #   if you enable this setting, it will cause non-java clients failed to produce.
 isSchemaValidationEnforced=false
 
+# The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`,
+# broker will use it in broker level. If schemaCompatibilityStrategy is `UNDEFINED` will use `FULL`.
+# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
+# FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
+# default : UNDEFINED
+schemaCompatibilityStrategy=
+
 ### --- Ledger Offloading --- ###
 
 # The directory for all the offloader implementations
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 12ffc57..c554c8a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.sasl.SaslConstants;
@@ -1940,11 +1941,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + "   if you enable this setting, it will cause non-java clients failed to produce."
     )
     private boolean isSchemaValidationEnforced = false;
+
     @FieldContext(
         category = CATEGORY_SCHEMA,
         doc = "The schema storage implementation used by this broker"
     )
     private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+
     @FieldContext(
         category = CATEGORY_SCHEMA,
         doc = "The list compatibility checkers to be used in schema registry"
@@ -1955,6 +1958,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
             "org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck"
     );
 
+    @FieldContext(
+            category = CATEGORY_SCHEMA,
+            doc = "The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`"
+                    + ", schema compatibility strategy check will use it in broker level."
+    )
+    private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED;
+
     /**** --- WebSocket --- ****/
     @FieldContext(
         category = CATEGORY_WEBSOCKET,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index e70e230..738d22e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2402,8 +2402,11 @@ public abstract class NamespacesBase extends AdminResource {
         Policies policies = getNamespacePolicies(namespaceName);
         SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
         if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
-            schemaCompatibilityStrategy = SchemaCompatibilityStrategy
-                    .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+            schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
+            if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
+                schemaCompatibilityStrategy = SchemaCompatibilityStrategy
+                        .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+            }
         }
         return schemaCompatibilityStrategy;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index e5299cb..96e96e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -139,8 +139,12 @@ public class SchemasResourceBase extends AdminResource {
         getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
             SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
             if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
-                schemaCompatibilityStrategy = SchemaCompatibilityStrategy
-                        .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+                schemaCompatibilityStrategy =
+                        pulsar().getConfig().getSchemaCompatibilityStrategy();
+                if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
+                    schemaCompatibilityStrategy = SchemaCompatibilityStrategy
+                            .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+                }
             }
             byte[] data;
             if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4080d5b..58ef410 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -543,10 +543,14 @@ public abstract class AbstractTopic implements Topic {
         PUBLISH_LATENCY.observe(latency, unit);
     }
 
-    protected void setSchemaCompatibilityStrategy (Policies policies) {
+    protected void setSchemaCompatibilityStrategy(Policies policies) {
         if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
-            schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
-                    policies.schema_auto_update_compatibility_strategy);
+            schemaCompatibilityStrategy = brokerService.pulsar()
+                    .getConfig().getSchemaCompatibilityStrategy();
+            if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
+                schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                        policies.schema_auto_update_compatibility_strategy);
+            }
         } else {
             schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index 7fa08ee..1367d4d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -79,6 +80,30 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
     }
 
     @Test
+    public void testSchemaCompatibilityStrategyInBrokerLevel() throws PulsarClientException {
+        conf.setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "testSchemaCompatibilityStrategyInBrokerLevel"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonOne>builder().
+                withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build()))
+                .topic(topicName)
+                .create();
+
+        ProducerBuilder<Schemas.PersonThree> producerBuilder = pulsarClient.newProducer(Schema.AVRO(SchemaDefinition
+                .<Schemas.PersonThree>builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build()))
+                .topic(topicName);
+
+        Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create);
+        assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
+    }
+
+    @Test
     public void structTypeProducerProducerUndefinedCompatible() throws Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);