You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/09/01 08:07:43 UTC
[pulsar] branch master updated: [Schema] Schema compatibility
strategy in broker level. (#11856)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b079c1e [Schema] Schema compatibility strategy in broker level. (#11856)
b079c1e is described below
commit b079c1e047975179260b64aa9f70c8f384580aba
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Sep 1 16:07:04 2021 +0800
[Schema] Schema compatibility strategy in broker level. (#11856)
## Motivation
link #11849
Schema compatibility strategy config in broker level.
## implement
If namespace schema compatibility strategy is `UNDEFINED`, use broker schema compatibility strategy
---
conf/broker.conf | 7 ++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 10 +++++++++
.../pulsar/broker/admin/impl/NamespacesBase.java | 7 ++++--
.../broker/admin/impl/SchemasResourceBase.java | 8 +++++--
.../pulsar/broker/service/AbstractTopic.java | 10 ++++++---
.../SchemaTypeCompatibilityCheckTest.java | 25 ++++++++++++++++++++++
6 files changed, 60 insertions(+), 7 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index bb6344f..46ddb7d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1200,6 +1200,13 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
# if you enable this setting, it will cause non-java clients failed to produce.
isSchemaValidationEnforced=false
+# The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`,
+# broker will use it in broker level. If schemaCompatibilityStrategy is `UNDEFINED` will use `FULL`.
+# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
+# FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
+# default : UNDEFINED
+schemaCompatibilityStrategy=
+
### --- Ledger Offloading --- ###
# The directory for all the offloader implementations
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 12ffc57..c554c8a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
@@ -1940,11 +1941,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " if you enable this setting, it will cause non-java clients failed to produce."
)
private boolean isSchemaValidationEnforced = false;
+
@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The schema storage implementation used by this broker"
)
private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+
@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The list compatibility checkers to be used in schema registry"
@@ -1955,6 +1958,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
"org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck"
);
+ @FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`"
+ + ", schema compatibility strategy check will use it in broker level."
+ )
+ private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED;
+
/**** --- WebSocket --- ****/
@FieldContext(
category = CATEGORY_WEBSOCKET,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index e70e230..738d22e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2402,8 +2402,11 @@ public abstract class NamespacesBase extends AdminResource {
Policies policies = getNamespacePolicies(namespaceName);
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
- schemaCompatibilityStrategy = SchemaCompatibilityStrategy
- .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+ schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
+ if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
+ schemaCompatibilityStrategy = SchemaCompatibilityStrategy
+ .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+ }
}
return schemaCompatibilityStrategy;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index e5299cb..96e96e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -139,8 +139,12 @@ public class SchemasResourceBase extends AdminResource {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
- schemaCompatibilityStrategy = SchemaCompatibilityStrategy
- .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+ schemaCompatibilityStrategy =
+ pulsar().getConfig().getSchemaCompatibilityStrategy();
+ if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
+ schemaCompatibilityStrategy = SchemaCompatibilityStrategy
+ .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+ }
}
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4080d5b..58ef410 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -543,10 +543,14 @@ public abstract class AbstractTopic implements Topic {
PUBLISH_LATENCY.observe(latency, unit);
}
- protected void setSchemaCompatibilityStrategy (Policies policies) {
+ protected void setSchemaCompatibilityStrategy(Policies policies) {
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
- schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
- policies.schema_auto_update_compatibility_strategy);
+ schemaCompatibilityStrategy = brokerService.pulsar()
+ .getConfig().getSchemaCompatibilityStrategy();
+ if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
+ schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+ policies.schema_auto_update_compatibility_strategy);
+ }
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index 7fa08ee..1367d4d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -79,6 +80,30 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
}
@Test
+ public void testSchemaCompatibilityStrategyInBrokerLevel() throws PulsarClientException {
+ conf.setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+
+ String topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ PUBLIC_TENANT,
+ namespace,
+ "testSchemaCompatibilityStrategyInBrokerLevel"
+ ).toString();
+
+ pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonOne>builder().
+ withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build()))
+ .topic(topicName)
+ .create();
+
+ ProducerBuilder<Schemas.PersonThree> producerBuilder = pulsarClient.newProducer(Schema.AVRO(SchemaDefinition
+ .<Schemas.PersonThree>builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build()))
+ .topic(topicName);
+
+ Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create);
+ assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
+ }
+
+ @Test
public void structTypeProducerProducerUndefinedCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);