You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 00:56:52 UTC
[pulsar] 06/15: [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 012e1b1a5541d0ca17c65efc1f4bb905ad76ea7b
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Thu Nov 18 20:37:37 2021 +0800
[Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)
(cherry picked from commit fa7be236efcc6772e0aac05f25f8d5f3cf0ad741)
---
conf/broker.conf | 4 ++
conf/standalone.conf | 4 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 8 +++
.../pulsar/broker/service/AbstractTopic.java | 38 ++++++----
.../SchemaCompatibilityCheckTest.java | 81 ++++++++++++++++++++++
.../pulsar/common/policies/data/Policies.java | 2 +-
6 files changed, 121 insertions(+), 16 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 3dd8590..2d7df90 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -270,6 +270,10 @@ brokerMaxConnections=0
# The maximum number of connections per IP. If it exceeds, new connections are rejected.
brokerMaxConnectionsPerIp=0
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 878a852..577a6ff 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -176,6 +176,10 @@ defaultNumberOfNamespaceBundles=4
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 69a3d35..0d578f4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -573,6 +573,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int brokerMaxConnectionsPerIp = 0;
@FieldContext(
+ category = CATEGORY_POLICIES,
+ dynamic = true,
+ doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'"
+ + " of namespace policy. This is enabled by default."
+ )
+ private boolean isAllowAutoUpdateSchemaEnabled = true;
+
+ @FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Enable check for minimum allowed client library version"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 9463be9..30c0fae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -95,7 +95,7 @@ public abstract class AbstractTopic implements Topic {
protected volatile boolean isEncryptionRequired = false;
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
- protected volatile boolean isAllowAutoUpdateSchema = true;
+ protected volatile Boolean isAllowAutoUpdateSchema;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;
@@ -328,20 +328,28 @@ public abstract class AbstractTopic implements Topic {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
- return isAllowAutoUpdateSchema ? schemaRegistryService
- .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
- : schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
- schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
- .thenCompose(schemaVersion -> {
- if (schemaVersion == null) {
- return FutureUtil
- .failedFuture(
- new IncompatibleSchemaException(
- "Schema not found and schema auto updating is disabled."));
- } else {
- return CompletableFuture.completedFuture(schemaVersion);
- }
- }));
+
+ if (allowAutoUpdateSchema()) {
+ return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
+ } else {
+ return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
+ schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
+ .thenCompose(schemaVersion -> {
+ if (schemaVersion == null) {
+ return FutureUtil.failedFuture(new IncompatibleSchemaException(
+ "Schema not found and schema auto updating is disabled."));
+ } else {
+ return CompletableFuture.completedFuture(schemaVersion);
+ }
+ }));
+ }
+ }
+
+ private boolean allowAutoUpdateSchema() {
+ if (isAllowAutoUpdateSchema == null) {
+ return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
+ }
+ return isAllowAutoUpdateSchema;
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 293f71d..80168b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -218,6 +218,87 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
}
}
+ @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
+ public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy)
+ throws Exception {
+
+ final String tenant = PUBLIC_TENANT;
+ final String topic = "test-consumer-compatibility";
+ String namespace = "test-namespace-" + randomName(16);
+ String fqtn = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topic
+ ).toString();
+
+ NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME)
+ );
+
+ assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
+ SchemaCompatibilityStrategy.FULL);
+
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
+ admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
+
+
+ pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+ ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
+ .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonTwo.class).build()))
+ .topic(fqtn);
+ try {
+ producerThreeBuilder.create();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
+ }
+
+ pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
+ ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
+ SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonTwo.class).build()))
+ .subscriptionName("test")
+ .topic(fqtn);
+
+ Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
+ Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();
+
+ producer.send(new Schemas.PersonTwo(2, "Lucy"));
+ Message<Schemas.PersonTwo> message = consumerTwo.receive();
+
+ Schemas.PersonTwo personTwo = message.getValue();
+ consumerTwo.acknowledge(message);
+
+ assertEquals(personTwo.getId(), 2);
+ assertEquals(personTwo.getName(), "Lucy");
+
+ producer.close();
+ consumerTwo.close();
+
+ pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+
+ producer = producerThreeBuilder.create();
+ consumerTwo = comsumerBuilder.subscribe();
+
+ producer.send(new Schemas.PersonTwo(2, "Lucy"));
+ message = consumerTwo.receive();
+
+ personTwo = message.getValue();
+ consumerTwo.acknowledge(message);
+
+ assertEquals(personTwo.getId(), 2);
+ assertEquals(personTwo.getName(), "Lucy");
+
+ consumerTwo.close();
+ producer.close();
+ }
+
@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 631675f..d26c61e 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -108,7 +108,7 @@ public class Policies {
public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED;
@SuppressWarnings("checkstyle:MemberName")
- public boolean is_allow_auto_update_schema = true;
+ public Boolean is_allow_auto_update_schema = null;
@SuppressWarnings("checkstyle:MemberName")
public boolean schema_validation_enforced = false;